Skip to content

Commit 50914c4

Browse files
authored
Honor all non-completion commands (#569)
* Honor commands generated after the first completion command
1 parent a839196 commit 50914c4

File tree

6 files changed

+453
-22
lines changed

6 files changed

+453
-22
lines changed

temporalio/bridge/Cargo.lock

Lines changed: 16 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

temporalio/worker/_workflow_instance.py

Lines changed: 9 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -403,27 +403,17 @@ def activate(
403403
f"Failed converting activation exception: {inner_err}"
404404
)
405405

406-
# If there are successful commands, we must remove all
407-
# non-query-responses after terminal workflow commands. We must do this
408-
# in place to avoid the copy-on-write that occurs when you reassign.
409-
seen_completion = False
410-
i = 0
411-
while i < len(self._current_completion.successful.commands):
412-
command = self._current_completion.successful.commands[i]
413-
if not seen_completion:
414-
seen_completion = (
415-
command.HasField("complete_workflow_execution")
416-
or command.HasField("continue_as_new_workflow_execution")
417-
or command.HasField("fail_workflow_execution")
418-
or command.HasField("cancel_workflow_execution")
419-
)
420-
elif not command.HasField("respond_to_query"):
421-
del self._current_completion.successful.commands[i]
422-
continue
423-
i += 1
406+
def is_completion(command):
407+
return (
408+
command.HasField("complete_workflow_execution")
409+
or command.HasField("continue_as_new_workflow_execution")
410+
or command.HasField("fail_workflow_execution")
411+
or command.HasField("cancel_workflow_execution")
412+
)
424413

425-
if seen_completion:
414+
if any(map(is_completion, self._current_completion.successful.commands)):
426415
self._warn_if_unfinished_handlers()
416+
427417
return self._current_completion
428418

429419
def _apply(

tests/worker/test_replayer.py

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -310,3 +310,78 @@ def new_say_hello_worker(client: Client) -> Worker:
310310
workflows=[SayHelloWorkflow],
311311
activities=[say_hello],
312312
)
313+
314+
315+
@workflow.defn
316+
class UpdateCompletionAfterWorkflowReturn:
317+
def __init__(self) -> None:
318+
self.workflow_returned = False
319+
320+
@workflow.run
321+
async def run(self) -> str:
322+
self.workflow_returned = True
323+
return "workflow-result"
324+
325+
@workflow.update
326+
async def my_update(self) -> str:
327+
await workflow.wait_condition(lambda: self.workflow_returned)
328+
return "update-result"
329+
330+
331+
async def test_replayer_command_reordering_backward_compatibility() -> None:
332+
"""
333+
The UpdateCompletionAfterWorkflowReturn workflow above features an update handler that returns
334+
after the main workflow coroutine has exited. It will (if an update is sent in the first WFT)
335+
generate a raw command sequence (before sending to core) of
336+
337+
[UpdateAccepted, CompleteWorkflowExecution, UpdateCompleted].
338+
339+
Prior to https://github.com/temporalio/sdk-python/pull/569, Python truncated this command
340+
sequence to
341+
342+
[UpdateAccepted, CompleteWorkflowExecution].
343+
344+
With #569, Python performs no truncation, and Core changes it to
345+
346+
[UpdateAccepted, UpdateCompleted, CompleteWorkflowExecution].
347+
348+
This test takes a history generated using pre-#569 SDK code, and replays it. This succeeds.
349+
The history is
350+
351+
1 WorkflowExecutionStarted
352+
2 WorkflowTaskScheduled
353+
3 WorkflowTaskStarted
354+
4 WorkflowTaskCompleted
355+
5 WorkflowExecutionUpdateAccepted
356+
6 WorkflowExecutionCompleted
357+
358+
Note that the history lacks a WorkflowExecutionUpdateCompleted event.
359+
360+
If Core's logic (which involves a flag) incorrectly allowed this history to be replayed
361+
using Core's post-#569 implementation, then a non-determinism error would result. Specifically,
362+
Core would, at some point during replay, do the following:
363+
364+
Receive [UpdateAccepted, CompleteWorkflowExecution, UpdateCompleted] from lang,
365+
change that to [UpdateAccepted, UpdateCompleted, CompleteWorkflowExecution]
366+
and create an UpdateMachine instance (the WorkflowTaskMachine instance already exists).
367+
Then continue to consume history events.
368+
369+
Event 5 WorkflowExecutionUpdateAccepted would apply to the UpdateMachine associated with
370+
the UpdateAccepted command, but event 6 WorkflowExecutionCompleted would not, since
371+
core is expecting an event that can be applied to the UpdateMachine corresponding to
372+
UpdateCompleted. If we modify core to incorrectly apply its new logic then we do see that:
373+
374+
[TMPRL1100] Nondeterminism error: Update machine does not handle this event: HistoryEvent(id: 6, WorkflowExecutionCompleted)
375+
376+
The test passes because core in fact (because the history lacks the flag) uses its old logic
377+
and changes the command sequence from [UpdateAccepted, CompleteWorkflowExecution, UpdateCompleted]
378+
to [UpdateAccepted, CompleteWorkflowExecution], and events 5 and 6 can be applied to the
379+
corresponding state machines.
380+
"""
381+
with Path(__file__).with_name(
382+
"test_replayer_command_reordering_backward_compatibility.json"
383+
).open() as f:
384+
history = f.read()
385+
await Replayer(workflows=[UpdateCompletionAfterWorkflowReturn]).replay_workflow(
386+
WorkflowHistory.from_json("fake", history)
387+
)
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
{
2+
"events": [
3+
{
4+
"eventId": "1",
5+
"eventTime": "2024-08-02T23:35:00.061520Z",
6+
"eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_STARTED",
7+
"taskId": "1049558",
8+
"workflowExecutionStartedEventAttributes": {
9+
"workflowType": {
10+
"name": "UpdateCompletionAfterWorkflowReturn"
11+
},
12+
"taskQueue": {
13+
"name": "tq",
14+
"kind": "TASK_QUEUE_KIND_NORMAL"
15+
},
16+
"workflowTaskTimeout": "10s",
17+
"originalExecutionRunId": "a32ce0cb-b50e-4734-b003-784dda811861",
18+
"identity": "[email protected]",
19+
"firstExecutionRunId": "a32ce0cb-b50e-4734-b003-784dda811861",
20+
"attempt": 1,
21+
"firstWorkflowTaskBackoff": "0s",
22+
"workflowId": "wf-dd1e2267-d1bf-4822-be38-2a97a499331e"
23+
}
24+
},
25+
{
26+
"eventId": "2",
27+
"eventTime": "2024-08-02T23:35:00.070867Z",
28+
"eventType": "EVENT_TYPE_WORKFLOW_TASK_SCHEDULED",
29+
"taskId": "1049559",
30+
"workflowTaskScheduledEventAttributes": {
31+
"taskQueue": {
32+
"name": "tq",
33+
"kind": "TASK_QUEUE_KIND_NORMAL"
34+
},
35+
"startToCloseTimeout": "10s",
36+
"attempt": 1
37+
}
38+
},
39+
{
40+
"eventId": "3",
41+
"eventTime": "2024-08-02T23:35:00.155562Z",
42+
"eventType": "EVENT_TYPE_WORKFLOW_TASK_STARTED",
43+
"taskId": "1049564",
44+
"workflowTaskStartedEventAttributes": {
45+
"scheduledEventId": "2",
46+
"identity": "[email protected]",
47+
"requestId": "b03f25fb-b2ab-4b93-b2ad-0f6899f6e2e2",
48+
"historySizeBytes": "260"
49+
}
50+
},
51+
{
52+
"eventId": "4",
53+
"eventTime": "2024-08-02T23:35:00.224744Z",
54+
"eventType": "EVENT_TYPE_WORKFLOW_TASK_COMPLETED",
55+
"taskId": "1049568",
56+
"workflowTaskCompletedEventAttributes": {
57+
"scheduledEventId": "2",
58+
"startedEventId": "3",
59+
"identity": "[email protected]",
60+
"workerVersion": {
61+
"buildId": "17647b02191ec9e4e58b623a9c71f20a"
62+
},
63+
"sdkMetadata": {
64+
"coreUsedFlags": [
65+
1,
66+
2
67+
]
68+
},
69+
"meteringMetadata": {}
70+
}
71+
},
72+
{
73+
"eventId": "5",
74+
"eventTime": "2024-08-02T23:35:00.242507Z",
75+
"eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_ACCEPTED",
76+
"taskId": "1049569",
77+
"workflowExecutionUpdateAcceptedEventAttributes": {
78+
"protocolInstanceId": "my-update",
79+
"acceptedRequestMessageId": "my-update/request",
80+
"acceptedRequestSequencingEventId": "2",
81+
"acceptedRequest": {
82+
"meta": {
83+
"updateId": "my-update",
84+
"identity": "[email protected]"
85+
},
86+
"input": {
87+
"name": "my_update"
88+
}
89+
}
90+
}
91+
},
92+
{
93+
"eventId": "6",
94+
"eventTime": "2024-08-02T23:35:00.258465Z",
95+
"eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED",
96+
"taskId": "1049570",
97+
"workflowExecutionCompletedEventAttributes": {
98+
"result": {
99+
"payloads": [
100+
{
101+
"metadata": {
102+
"encoding": "anNvbi9wbGFpbg==",
103+
"encodingDecoded": "json/plain"
104+
},
105+
"data": "workflow-result"
106+
}
107+
]
108+
},
109+
"workflowTaskCompletedEventId": "4"
110+
}
111+
}
112+
]
113+
}

0 commit comments

Comments
 (0)