Skip to content

Commit ffa761c

Browse files
committed
Honor commands generated after the first completion command
1 parent bcbacc2 commit ffa761c

File tree

3 files changed

+204
-20
lines changed

3 files changed

+204
-20
lines changed

temporalio/worker/_workflow_instance.py

Lines changed: 9 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -403,27 +403,17 @@ def activate(
403403
f"Failed converting activation exception: {inner_err}"
404404
)
405405

406-
# If there are successful commands, we must remove all
407-
# non-query-responses after terminal workflow commands. We must do this
408-
# in place to avoid the copy-on-write that occurs when you reassign.
409-
seen_completion = False
410-
i = 0
411-
while i < len(self._current_completion.successful.commands):
412-
command = self._current_completion.successful.commands[i]
413-
if not seen_completion:
414-
seen_completion = (
415-
command.HasField("complete_workflow_execution")
416-
or command.HasField("continue_as_new_workflow_execution")
417-
or command.HasField("fail_workflow_execution")
418-
or command.HasField("cancel_workflow_execution")
419-
)
420-
elif not command.HasField("respond_to_query"):
421-
del self._current_completion.successful.commands[i]
422-
continue
423-
i += 1
406+
def is_completion(command):
407+
return (
408+
command.HasField("complete_workflow_execution")
409+
or command.HasField("continue_as_new_workflow_execution")
410+
or command.HasField("fail_workflow_execution")
411+
or command.HasField("cancel_workflow_execution")
412+
)
424413

425-
if seen_completion:
414+
if any(map(is_completion, self._current_completion.successful.commands)):
426415
self._warn_if_unfinished_handlers()
416+
427417
return self._current_completion
428418

429419
def _apply(

tests/worker/test_workflow.py

Lines changed: 194 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
from typing import (
2020
Any,
2121
Awaitable,
22+
Callable,
2223
Dict,
2324
List,
2425
Mapping,
@@ -5505,3 +5506,196 @@ def _unfinished_handler_warning_cls(self) -> Type:
55055506
"update": workflow.UnfinishedUpdateHandlersWarning,
55065507
"signal": workflow.UnfinishedSignalHandlersWarning,
55075508
}[self.handler_type]
5509+
5510+
5511+
@workflow.defn
5512+
class TestUpdateCompletionIsHonoredWhenAfterWorkflowReturn1:
5513+
def __init__(self) -> None:
5514+
self.workflow_returned = False
5515+
5516+
@workflow.run
5517+
async def run(self) -> str:
5518+
self.workflow_returned = True
5519+
return "workflow-result"
5520+
5521+
@workflow.update
5522+
async def my_update(self) -> str:
5523+
await workflow.wait_condition(lambda: self.workflow_returned)
5524+
return "update-result"
5525+
5526+
5527+
async def test_update_completion_is_honored_when_after_workflow_return_1(
5528+
client: Client,
5529+
):
5530+
update_id = "my-update"
5531+
task_queue = "tq"
5532+
wf_handle = await client.start_workflow(
5533+
TestUpdateCompletionIsHonoredWhenAfterWorkflowReturn1.run,
5534+
id=f"wf-{uuid.uuid4()}",
5535+
task_queue=task_queue,
5536+
)
5537+
update_result_task = asyncio.create_task(
5538+
wf_handle.execute_update(
5539+
TestUpdateCompletionIsHonoredWhenAfterWorkflowReturn1.my_update,
5540+
id=update_id,
5541+
)
5542+
)
5543+
await workflow_update_exists(client, wf_handle.id, update_id)
5544+
5545+
async with Worker(
5546+
client,
5547+
task_queue=task_queue,
5548+
workflows=[TestUpdateCompletionIsHonoredWhenAfterWorkflowReturn1],
5549+
):
5550+
assert await wf_handle.result() == "workflow-result"
5551+
assert await update_result_task == "update-result"
5552+
5553+
5554+
@workflow.defn
5555+
class TestUpdateCompletionIsHonoredWhenAfterWorkflowReturnWorkflow2:
5556+
def __init__(self):
5557+
self.received_update = False
5558+
self.update_result: asyncio.Future[str] = asyncio.Future()
5559+
5560+
@workflow.run
5561+
async def run(self) -> str:
5562+
await workflow.wait_condition(lambda: self.received_update)
5563+
self.update_result.set_result("update-result")
5564+
# Prior to https://github.com/temporalio/features/issues/481, the client
5565+
# waiting on the update got a "Workflow execution already completed"
5566+
# error instead of the update result, because the main workflow
5567+
# coroutine completion command is emitted before the update completion
5568+
# command, and we were truncating commands at the first completion
5569+
# command.
5570+
return "workflow-result"
5571+
5572+
@workflow.update
5573+
async def my_update(self) -> str:
5574+
self.received_update = True
5575+
return await self.update_result
5576+
5577+
5578+
async def test_update_completion_is_honored_when_after_workflow_return_2(
5579+
client: Client,
5580+
):
5581+
async with Worker(
5582+
client,
5583+
task_queue="tq",
5584+
workflows=[TestUpdateCompletionIsHonoredWhenAfterWorkflowReturnWorkflow2],
5585+
) as worker:
5586+
handle = await client.start_workflow(
5587+
TestUpdateCompletionIsHonoredWhenAfterWorkflowReturnWorkflow2.run,
5588+
id=f"wf-{uuid.uuid4()}",
5589+
task_queue=worker.task_queue,
5590+
)
5591+
update_result = await handle.execute_update(
5592+
TestUpdateCompletionIsHonoredWhenAfterWorkflowReturnWorkflow2.my_update
5593+
)
5594+
assert update_result == "update-result"
5595+
assert await handle.result() == "workflow-result"
5596+
5597+
5598+
@workflow.defn
5599+
class FirstCompletionCommandIsHonoredWorkflow:
5600+
def __init__(self, main_workflow_returns_before_signal_completions=False) -> None:
5601+
self.seen_first_signal = False
5602+
self.seen_second_signal = False
5603+
self.main_workflow_returns_before_signal_completions = (
5604+
main_workflow_returns_before_signal_completions
5605+
)
5606+
self.ping_pong_val = 1
5607+
self.ping_pong_counter = 0
5608+
self.ping_pong_max_count = 4
5609+
5610+
@workflow.run
5611+
async def run(self) -> str:
5612+
await workflow.wait_condition(
5613+
lambda: self.seen_first_signal and self.seen_second_signal
5614+
)
5615+
return "workflow-result"
5616+
5617+
@workflow.signal
5618+
async def this_signal_executes_first(self):
5619+
self.seen_first_signal = True
5620+
if self.main_workflow_returns_before_signal_completions:
5621+
await self.ping_pong(lambda: self.ping_pong_val > 0)
5622+
raise ApplicationError(
5623+
"Client should see this error unless doing ping-pong "
5624+
"(in which case main coroutine returns first)"
5625+
)
5626+
5627+
@workflow.signal
5628+
async def this_signal_executes_second(self):
5629+
await workflow.wait_condition(lambda: self.seen_first_signal)
5630+
self.seen_second_signal = True
5631+
if self.main_workflow_returns_before_signal_completions:
5632+
await self.ping_pong(lambda: self.ping_pong_val < 0)
5633+
raise ApplicationError("Client should never see this error!")
5634+
5635+
async def ping_pong(self, cond: Callable[[], bool]):
5636+
while self.ping_pong_counter < self.ping_pong_max_count:
5637+
await workflow.wait_condition(cond)
5638+
self.ping_pong_val = -self.ping_pong_val
5639+
self.ping_pong_counter += 1
5640+
5641+
5642+
@workflow.defn
5643+
class FirstCompletionCommandIsHonoredPingPongWorkflow(
5644+
FirstCompletionCommandIsHonoredWorkflow
5645+
):
5646+
def __init__(self) -> None:
5647+
super().__init__(main_workflow_returns_before_signal_completions=True)
5648+
5649+
@workflow.run
5650+
async def run(self) -> str:
5651+
return await super().run()
5652+
5653+
5654+
async def test_first_of_two_signal_completion_commands_is_honored(client: Client):
5655+
await _do_first_completion_command_is_honored_test(
5656+
client, main_workflow_returns_before_signal_completions=False
5657+
)
5658+
5659+
5660+
async def test_workflow_return_is_honored_when_it_precedes_signal_completion_command(
5661+
client: Client,
5662+
):
5663+
await _do_first_completion_command_is_honored_test(
5664+
client, main_workflow_returns_before_signal_completions=True
5665+
)
5666+
5667+
5668+
async def _do_first_completion_command_is_honored_test(
5669+
client: Client, main_workflow_returns_before_signal_completions: bool
5670+
):
5671+
workflow_cls = (
5672+
FirstCompletionCommandIsHonoredPingPongWorkflow
5673+
if main_workflow_returns_before_signal_completions
5674+
else FirstCompletionCommandIsHonoredWorkflow
5675+
)
5676+
async with Worker(
5677+
client,
5678+
task_queue="tq",
5679+
workflows=[workflow_cls],
5680+
) as worker:
5681+
handle = await client.start_workflow(
5682+
workflow_cls.run,
5683+
id=f"wf-{uuid.uuid4()}",
5684+
task_queue=worker.task_queue,
5685+
)
5686+
await handle.signal(workflow_cls.this_signal_executes_second)
5687+
await handle.signal(workflow_cls.this_signal_executes_first)
5688+
try:
5689+
result = await handle.result()
5690+
except WorkflowFailureError as err:
5691+
if main_workflow_returns_before_signal_completions:
5692+
assert (
5693+
False
5694+
), "Expected no error due to main workflow coroutine returning first"
5695+
else:
5696+
assert str(err.cause).startswith("Client should see this error")
5697+
else:
5698+
assert (
5699+
main_workflow_returns_before_signal_completions
5700+
and result == "workflow-result"
5701+
)

0 commit comments

Comments
 (0)