Skip to content

Commit 2e68dad

Browse files
committed
WIP: cancellation
1 parent 6eb176a commit 2e68dad

File tree

2 files changed

+47
-2
lines changed

2 files changed

+47
-2
lines changed

temporalio/worker/_workflow_instance.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -422,8 +422,6 @@ def activate(
422422
continue
423423
i += 1
424424

425-
if seen_completion:
426-
self._warn_if_unfinished_handlers()
427425
return self._current_completion
428426

429427
def _apply(

tests/worker/test_workflow.py

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5632,6 +5632,53 @@ async def finish(self):
56325632
self.workflow_may_exit = True
56335633

56345634

5635+
@workflow.defn
5636+
class UpdateCancellationWorkflow(CoroutinesUseLockWorkflow):
5637+
def __init__(self) -> None:
5638+
self.non_terminating_operation_has_started = False
5639+
5640+
@workflow.run
5641+
async def run(self) -> str:
5642+
await workflow.wait_condition(lambda: False)
5643+
return "unreachable"
5644+
5645+
@workflow.update
5646+
async def wait_until_non_terminating_operation_has_started(self) -> None:
5647+
await workflow.wait_condition(
5648+
lambda: self.non_terminating_operation_has_started
5649+
)
5650+
5651+
@workflow.update
5652+
async def non_terminating_operation(self) -> str:
5653+
self.non_terminating_operation_has_started = True
5654+
await workflow.wait_condition(lambda: False)
5655+
return "unreachable"
5656+
5657+
5658+
async def test_update_cancellation(client: Client):
5659+
async with new_worker(client, UpdateCancellationWorkflow) as worker:
5660+
wf_handle = await client.start_workflow(
5661+
UpdateCancellationWorkflow.run,
5662+
id=str(uuid.uuid4()),
5663+
task_queue=worker.task_queue,
5664+
)
5665+
# Asynchronously run an update that will never complete
5666+
non_terminating_update_task = asyncio.create_task(
5667+
wf_handle.execute_update(
5668+
UpdateCancellationWorkflow.non_terminating_operation
5669+
)
5670+
)
5671+
print("wait until handler started...")
5672+
# Wait until we know the update handler has started executing
5673+
await wf_handle.execute_update(
5674+
UpdateCancellationWorkflow.wait_until_non_terminating_operation_has_started
5675+
)
5676+
print("cancel the workflow")
5677+
await wf_handle.cancel()
5678+
print("await non_terminating_update_task...")
5679+
await non_terminating_update_task
5680+
5681+
56355682
async def _do_workflow_coroutines_lock_or_semaphore_test(
56365683
client: Client,
56375684
params: UseLockOrSemaphoreWorkflowParameters,

0 commit comments

Comments
 (0)