Skip to content

Commit 94d0d38

Browse files
committed
WIP: cancellation
1 parent 6eb176a commit 94d0d38

File tree

1 file changed

+49
-0
lines changed

1 file changed

+49
-0
lines changed

tests/worker/test_workflow.py

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5632,6 +5632,55 @@ async def finish(self):
56325632
self.workflow_may_exit = True
56335633

56345634

5635+
@workflow.defn
5636+
class HandlerCoroutinesUseLockWithCancellationWorkflow(CoroutinesUseLockWorkflow):
5637+
def __init__(self) -> None:
5638+
self.non_terminating_operation_has_started = False
5639+
5640+
@workflow.run
5641+
async def run(self) -> str:
5642+
await workflow.wait_condition(lambda: False)
5643+
return "unreachable"
5644+
5645+
@workflow.update
5646+
async def wait_until_non_terminating_operation_has_started(self) -> None:
5647+
await workflow.wait_condition(
5648+
lambda: self.non_terminating_operation_has_started
5649+
)
5650+
5651+
@workflow.update
5652+
async def non_terminating_operation(self) -> str:
5653+
self.non_terminating_operation_has_started = True
5654+
await workflow.wait_condition(lambda: False)
5655+
return "unreachable"
5656+
5657+
5658+
async def test_workflow_coroutines_can_use_lock_with_cancellation(client: Client):
5659+
async with new_worker(
5660+
client, HandlerCoroutinesUseLockWithCancellationWorkflow
5661+
) as worker:
5662+
wf_handle = await client.start_workflow(
5663+
HandlerCoroutinesUseLockWithCancellationWorkflow.run,
5664+
id=str(uuid.uuid4()),
5665+
task_queue=worker.task_queue,
5666+
)
5667+
# Asynchronously run an update that will never complete
5668+
non_terminating_update_task = asyncio.create_task(
5669+
wf_handle.execute_update(
5670+
HandlerCoroutinesUseLockWithCancellationWorkflow.non_terminating_operation
5671+
)
5672+
)
5673+
print("wait until handler started...")
5674+
# Wait until we know the update handler has started executing
5675+
await wf_handle.execute_update(
5676+
HandlerCoroutinesUseLockWithCancellationWorkflow.wait_until_non_terminating_operation_has_started
5677+
)
5678+
print("cancel the workflow")
5679+
await wf_handle.cancel()
5680+
print("await non_terminating_update_task...")
5681+
await non_terminating_update_task
5682+
5683+
56355684
async def _do_workflow_coroutines_lock_or_semaphore_test(
56365685
client: Client,
56375686
params: UseLockOrSemaphoreWorkflowParameters,

0 commit comments

Comments
 (0)