From 425d3e6cbef95654768d057e9e6038f3b0e29e1c Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Thu, 15 Aug 2024 08:40:57 -0400 Subject: [PATCH 1/7] Rename test methods --- tests/worker/test_workflow.py | 44 +++++++++++++++++------------------ 1 file changed, 22 insertions(+), 22 deletions(-) diff --git a/tests/worker/test_workflow.py b/tests/worker/test_workflow.py index 907d30ceb..778b398de 100644 --- a/tests/worker/test_workflow.py +++ b/tests/worker/test_workflow.py @@ -5222,7 +5222,7 @@ async def test_workflow_current_update(client: Client, env: WorkflowEnvironment) @workflow.defn -class UnfinishedHandlersWorkflow: +class UnfinishedHandlersWarningsWorkflow: def __init__(self): self.started_handler = False self.handler_may_return = False @@ -5275,21 +5275,21 @@ async def test_unfinished_update_handler(client: Client, env: WorkflowEnvironmen pytest.skip( "Java test server: https://github.com/temporalio/sdk-java/issues/1903" ) - async with new_worker(client, UnfinishedHandlersWorkflow) as worker: - test = _UnfinishedHandlersTest(client, worker, "update") + async with new_worker(client, UnfinishedHandlersWarningsWorkflow) as worker: + test = _UnfinishedHandlersWarningsTest(client, worker, "update") await test.test_wait_all_handlers_finished_and_unfinished_handlers_warning() await test.test_unfinished_handlers_cause_exceptions_in_test_suite() async def test_unfinished_signal_handler(client: Client): - async with new_worker(client, UnfinishedHandlersWorkflow) as worker: - test = _UnfinishedHandlersTest(client, worker, "signal") + async with new_worker(client, UnfinishedHandlersWarningsWorkflow) as worker: + test = _UnfinishedHandlersWarningsTest(client, worker, "signal") await test.test_wait_all_handlers_finished_and_unfinished_handlers_warning() await test.test_unfinished_handlers_cause_exceptions_in_test_suite() @dataclass -class _UnfinishedHandlersTest: +class _UnfinishedHandlersWarningsTest: client: Client worker: Worker handler_type: Literal["update", "signal"] @@ -5370,7 +5370,7 @@ async def _get_workflow_result( handle_future: Optional[asyncio.Future[WorkflowHandle]] = None, ) -> bool: handle = await self.client.start_workflow( - UnfinishedHandlersWorkflow.run, + UnfinishedHandlersWarningsWorkflow.run, arg=wait_all_handlers_finished, id=f"wf-{uuid.uuid4()}", task_queue=self.worker.task_queue, @@ -5404,7 +5404,7 @@ def _unfinished_handler_warning_cls(self) -> Type: @workflow.defn -class UnfinishedHandlersWithCancellationOrFailureWorkflow: +class UnfinishedHandlersOnWorkflowTerminationWorkflow: @workflow.run async def run( self, workflow_termination_type: Literal["cancellation", "failure"] @@ -5434,19 +5434,19 @@ async def test_unfinished_update_handler_with_workflow_cancellation( pytest.skip( "Java test server: https://github.com/temporalio/sdk-java/issues/1903" ) - await _UnfinishedHandlersWithCancellationOrFailureTest( + await _UnfinishedHandlersOnWorkflowTerminationTest( client, "update", "cancellation", - ).test_warning_is_issued_when_cancellation_or_failure_causes_exit_with_unfinished_handler() + ).test_warning_is_issued_on_exit_with_unfinished_handler() async def test_unfinished_signal_handler_with_workflow_cancellation(client: Client): - await _UnfinishedHandlersWithCancellationOrFailureTest( + await _UnfinishedHandlersOnWorkflowTerminationTest( client, "signal", "cancellation", - ).test_warning_is_issued_when_cancellation_or_failure_causes_exit_with_unfinished_handler() + ).test_warning_is_issued_on_exit_with_unfinished_handler() async def test_unfinished_update_handler_with_workflow_failure( @@ -5456,11 +5456,11 @@ async def test_unfinished_update_handler_with_workflow_failure( pytest.skip( "Java test server: https://github.com/temporalio/sdk-java/issues/1903" ) - await _UnfinishedHandlersWithCancellationOrFailureTest( + await _UnfinishedHandlersOnWorkflowTerminationTest( client, "update", "failure", - ).test_warning_is_issued_when_cancellation_or_failure_causes_exit_with_unfinished_handler() + ).test_warning_is_issued_on_exit_with_unfinished_handler() async def test_unfinished_signal_handler_with_workflow_failure( @@ -5470,20 +5470,20 @@ async def test_unfinished_signal_handler_with_workflow_failure( pytest.skip( "Java test server: https://github.com/temporalio/sdk-java/issues/2127" ) - await _UnfinishedHandlersWithCancellationOrFailureTest( + await _UnfinishedHandlersOnWorkflowTerminationTest( client, "signal", "failure", - ).test_warning_is_issued_when_cancellation_or_failure_causes_exit_with_unfinished_handler() + ).test_warning_is_issued_on_exit_with_unfinished_handler() @dataclass -class _UnfinishedHandlersWithCancellationOrFailureTest: +class _UnfinishedHandlersOnWorkflowTerminationTest: client: Client handler_type: Literal["update", "signal"] workflow_termination_type: Literal["cancellation", "failure"] - async def test_warning_is_issued_when_cancellation_or_failure_causes_exit_with_unfinished_handler( + async def test_warning_is_issued_on_exit_with_unfinished_handler( self, ): assert await self._run_workflow_and_get_warning() @@ -5497,7 +5497,7 @@ async def _run_workflow_and_get_warning(self) -> bool: # in the same WFT. To do this we start the worker after they've all been accepted by the # server. handle = await self.client.start_workflow( - UnfinishedHandlersWithCancellationOrFailureWorkflow.run, + UnfinishedHandlersOnWorkflowTerminationWorkflow.run, self.workflow_termination_type, id=workflow_id, task_queue=task_queue, @@ -5508,7 +5508,7 @@ async def _run_workflow_and_get_warning(self) -> bool: if self.handler_type == "update": update_task = asyncio.create_task( handle.execute_update( - UnfinishedHandlersWithCancellationOrFailureWorkflow.my_update, + UnfinishedHandlersOnWorkflowTerminationWorkflow.my_update, id=update_id, ) ) @@ -5518,12 +5518,12 @@ async def _run_workflow_and_get_warning(self) -> bool: ) else: await handle.signal( - UnfinishedHandlersWithCancellationOrFailureWorkflow.my_signal + UnfinishedHandlersOnWorkflowTerminationWorkflow.my_signal ) async with new_worker( self.client, - UnfinishedHandlersWithCancellationOrFailureWorkflow, + UnfinishedHandlersOnWorkflowTerminationWorkflow, task_queue=task_queue, ): with pytest.WarningsRecorder() as warnings: From 446c46e7b7ea2950250ca57190f50b09ebf00fef Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Thu, 15 Aug 2024 07:49:55 -0400 Subject: [PATCH 2/7] Add rule code to warning messages --- temporalio/worker/_workflow_instance.py | 4 ++-- tests/worker/test_workflow.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/temporalio/worker/_workflow_instance.py b/temporalio/worker/_workflow_instance.py index e4f69b7f3..7fa3cded5 100644 --- a/temporalio/worker/_workflow_instance.py +++ b/temporalio/worker/_workflow_instance.py @@ -2778,7 +2778,7 @@ def _make_unfinished_update_handler_message( handler_executions: List[HandlerExecution], ) -> str: message = """ -Workflow finished while update handlers are still running. This may have interrupted work that the +[TMPRL1102] Workflow finished while update handlers are still running. This may have interrupted work that the update handler was doing, and the client that sent the update will receive a 'workflow execution already completed' RPCError instead of the update result. You can wait for all update and signal handlers to complete by using `await workflow.wait_condition(lambda: @@ -2797,7 +2797,7 @@ def _make_unfinished_signal_handler_message( handler_executions: List[HandlerExecution], ) -> str: message = """ -Workflow finished while signal handlers are still running. This may have interrupted work that the +[TMPRL1102] Workflow finished while signal handlers are still running. This may have interrupted work that the signal handler was doing. You can wait for all update and signal handlers to complete by using `await workflow.wait_condition(lambda: workflow.all_handlers_finished())`. Alternatively, if both you and the clients sending the signal are okay with interrupting running handlers when the workflow diff --git a/tests/worker/test_workflow.py b/tests/worker/test_workflow.py index 778b398de..9f9b871ad 100644 --- a/tests/worker/test_workflow.py +++ b/tests/worker/test_workflow.py @@ -5343,7 +5343,7 @@ async def _workflow_task_failed(self, workflow_id: str) -> bool: for event in reversed(resp.history.events): if event.event_type == EventType.EVENT_TYPE_WORKFLOW_TASK_FAILED: assert event.workflow_task_failed_event_attributes.failure.message.startswith( - f"Workflow finished while {self.handler_type} handlers are still running" + f"[TMPRL1102] Workflow finished while {self.handler_type} handlers are still running" ) return True return False From af2634ae08818c08cba04988e89877b73d7eb5e4 Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Thu, 15 Aug 2024 08:41:09 -0400 Subject: [PATCH 3/7] Add tests of CAN and unfinished handler warnings --- tests/worker/test_workflow.py | 61 +++++++++++++++++++++++++++++++---- 1 file changed, 54 insertions(+), 7 deletions(-) diff --git a/tests/worker/test_workflow.py b/tests/worker/test_workflow.py index 9f9b871ad..c407cb887 100644 --- a/tests/worker/test_workflow.py +++ b/tests/worker/test_workflow.py @@ -5407,14 +5407,26 @@ def _unfinished_handler_warning_cls(self) -> Type: class UnfinishedHandlersOnWorkflowTerminationWorkflow: @workflow.run async def run( - self, workflow_termination_type: Literal["cancellation", "failure"] + self, + workflow_termination_type: Literal[ + "cancellation", + "failure", + "continue-as-new", + "fail-post-continue-as-new-run", + ], ) -> NoReturn: if workflow_termination_type == "failure": raise ApplicationError( "Deliberately failing workflow with an unfinished handler" ) - await workflow.wait_condition(lambda: False) - raise AssertionError("unreachable") + if workflow_termination_type == "fail-post-continue-as-new-run": + raise ApplicationError("Deliberately failing post-ContinueAsNew run") + elif workflow_termination_type == "continue-as-new": + # Fail next run so tat test terminates + workflow.continue_as_new("fail-post-continue-as-new-run") + else: + await workflow.wait_condition(lambda: False) + raise AssertionError("unreachable") @workflow.update async def my_update(self) -> NoReturn: @@ -5477,11 +5489,39 @@ async def test_unfinished_signal_handler_with_workflow_failure( ).test_warning_is_issued_on_exit_with_unfinished_handler() +async def test_unfinished_update_handler_with_continue_as_new( + client: Client, env: WorkflowEnvironment +): + if env.supports_time_skipping: + pytest.skip( + "Java test server: https://github.com/temporalio/sdk-java/issues/1903" + ) + await _UnfinishedHandlersOnWorkflowTerminationTest( + client, + "update", + "continue-as-new", + ).test_warning_is_issued_on_exit_with_unfinished_handler() + + +async def test_unfinished_signal_handler_with_continue_as_new( + client: Client, env: WorkflowEnvironment +): + if env.supports_time_skipping: + pytest.skip( + "Java test server: https://github.com/temporalio/sdk-java/issues/2127" + ) + await _UnfinishedHandlersOnWorkflowTerminationTest( + client, + "signal", + "continue-as-new", + ).test_warning_is_issued_on_exit_with_unfinished_handler() + + @dataclass class _UnfinishedHandlersOnWorkflowTerminationTest: client: Client handler_type: Literal["update", "signal"] - workflow_termination_type: Literal["cancellation", "failure"] + workflow_termination_type: Literal["cancellation", "failure", "continue-as-new"] async def test_warning_is_issued_on_exit_with_unfinished_handler( self, @@ -5540,10 +5580,17 @@ async def _run_workflow_and_get_warning(self) -> bool: await handle.result() assert isinstance( err.value.cause, - {"cancellation": CancelledError, "failure": ApplicationError}[ - self.workflow_termination_type - ], + { + "cancellation": CancelledError, + "continue-as-new": ApplicationError, + "failure": ApplicationError, + }[self.workflow_termination_type], ) + if self.workflow_termination_type == "continue-as-new": + assert ( + str(err.value.cause) + == "Deliberately failing post-ContinueAsNew run" + ) unfinished_handler_warning_emitted = any( issubclass(w.category, self._unfinished_handler_warning_cls) From defc051adec3daf4f5585d209f1c77d4482a3480 Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Fri, 16 Aug 2024 12:34:51 -0400 Subject: [PATCH 4/7] Add await all_handlers_finished coverage to unfinished handlers tests --- tests/worker/test_workflow.py | 67 ++++++++++++++++++++++++----------- 1 file changed, 47 insertions(+), 20 deletions(-) diff --git a/tests/worker/test_workflow.py b/tests/worker/test_workflow.py index c407cb887..d75a4189a 100644 --- a/tests/worker/test_workflow.py +++ b/tests/worker/test_workflow.py @@ -5405,6 +5405,9 @@ def _unfinished_handler_warning_cls(self) -> Type: @workflow.defn class UnfinishedHandlersOnWorkflowTerminationWorkflow: + def __init__(self) -> None: + self.handlers_may_finish = False + @workflow.run async def run( self, @@ -5414,33 +5417,38 @@ async def run( "continue-as-new", "fail-post-continue-as-new-run", ], + wait_all_handlers_finished: bool, ) -> NoReturn: + if wait_all_handlers_finished: + self.handlers_may_finish = True + await workflow.wait_condition(workflow.all_handlers_finished) if workflow_termination_type == "failure": raise ApplicationError( "Deliberately failing workflow with an unfinished handler" ) - if workflow_termination_type == "fail-post-continue-as-new-run": + elif workflow_termination_type == "fail-post-continue-as-new-run": raise ApplicationError("Deliberately failing post-ContinueAsNew run") elif workflow_termination_type == "continue-as-new": - # Fail next run so tat test terminates - workflow.continue_as_new("fail-post-continue-as-new-run") + # Fail next run so that test terminates + workflow.continue_as_new( + args=["fail-post-continue-as-new-run", wait_all_handlers_finished] + ) else: await workflow.wait_condition(lambda: False) raise AssertionError("unreachable") @workflow.update async def my_update(self) -> NoReturn: - await workflow.wait_condition(lambda: False) - raise AssertionError("unreachable") + await workflow.wait_condition(lambda: self.handlers_may_finish) @workflow.signal async def my_signal(self) -> NoReturn: - await workflow.wait_condition(lambda: False) - raise AssertionError("unreachable") + await workflow.wait_condition(lambda: self.handlers_may_finish) +@pytest.mark.parametrize("wait_all_handlers_finished", [True, False]) async def test_unfinished_update_handler_with_workflow_cancellation( - client: Client, env: WorkflowEnvironment + client: Client, env: WorkflowEnvironment, wait_all_handlers_finished: bool ): if env.supports_time_skipping: pytest.skip( @@ -5450,19 +5458,25 @@ async def test_unfinished_update_handler_with_workflow_cancellation( client, "update", "cancellation", + wait_all_handlers_finished, ).test_warning_is_issued_on_exit_with_unfinished_handler() -async def test_unfinished_signal_handler_with_workflow_cancellation(client: Client): +@pytest.mark.parametrize("wait_all_handlers_finished", [True, False]) +async def test_unfinished_signal_handler_with_workflow_cancellation( + client: Client, wait_all_handlers_finished: bool +): await _UnfinishedHandlersOnWorkflowTerminationTest( client, "signal", "cancellation", + wait_all_handlers_finished, ).test_warning_is_issued_on_exit_with_unfinished_handler() +@pytest.mark.parametrize("wait_all_handlers_finished", [True, False]) async def test_unfinished_update_handler_with_workflow_failure( - client: Client, env: WorkflowEnvironment + client: Client, env: WorkflowEnvironment, wait_all_handlers_finished: bool ): if env.supports_time_skipping: pytest.skip( @@ -5472,11 +5486,13 @@ async def test_unfinished_update_handler_with_workflow_failure( client, "update", "failure", + wait_all_handlers_finished, ).test_warning_is_issued_on_exit_with_unfinished_handler() +@pytest.mark.parametrize("wait_all_handlers_finished", [True, False]) async def test_unfinished_signal_handler_with_workflow_failure( - client: Client, env: WorkflowEnvironment + client: Client, env: WorkflowEnvironment, wait_all_handlers_finished: bool ): if env.supports_time_skipping: pytest.skip( @@ -5486,11 +5502,13 @@ async def test_unfinished_signal_handler_with_workflow_failure( client, "signal", "failure", + wait_all_handlers_finished, ).test_warning_is_issued_on_exit_with_unfinished_handler() +@pytest.mark.parametrize("wait_all_handlers_finished", [True, False]) async def test_unfinished_update_handler_with_continue_as_new( - client: Client, env: WorkflowEnvironment + client: Client, env: WorkflowEnvironment, wait_all_handlers_finished: bool ): if env.supports_time_skipping: pytest.skip( @@ -5500,11 +5518,13 @@ async def test_unfinished_update_handler_with_continue_as_new( client, "update", "continue-as-new", + wait_all_handlers_finished, ).test_warning_is_issued_on_exit_with_unfinished_handler() +@pytest.mark.parametrize("wait_all_handlers_finished", [True, False]) async def test_unfinished_signal_handler_with_continue_as_new( - client: Client, env: WorkflowEnvironment + client: Client, env: WorkflowEnvironment, wait_all_handlers_finished: bool ): if env.supports_time_skipping: pytest.skip( @@ -5514,6 +5534,7 @@ async def test_unfinished_signal_handler_with_continue_as_new( client, "signal", "continue-as-new", + wait_all_handlers_finished, ).test_warning_is_issued_on_exit_with_unfinished_handler() @@ -5522,11 +5543,14 @@ class _UnfinishedHandlersOnWorkflowTerminationTest: client: Client handler_type: Literal["update", "signal"] workflow_termination_type: Literal["cancellation", "failure", "continue-as-new"] + wait_all_handlers_finished: bool async def test_warning_is_issued_on_exit_with_unfinished_handler( self, ): - assert await self._run_workflow_and_get_warning() + assert await self._run_workflow_and_get_warning() == ( + not self.wait_all_handlers_finished + ) async def _run_workflow_and_get_warning(self) -> bool: workflow_id = f"wf-{uuid.uuid4()}" @@ -5538,7 +5562,7 @@ async def _run_workflow_and_get_warning(self) -> bool: # server. handle = await self.client.start_workflow( UnfinishedHandlersOnWorkflowTerminationWorkflow.run, - self.workflow_termination_type, + args=[self.workflow_termination_type, self.wait_all_handlers_finished], id=workflow_id, task_queue=task_queue, ) @@ -5569,12 +5593,15 @@ async def _run_workflow_and_get_warning(self) -> bool: with pytest.WarningsRecorder() as warnings: if self.handler_type == "update": assert update_task - with pytest.raises(RPCError) as update_err: + if self.wait_all_handlers_finished: await update_task - assert update_err.value.status == RPCStatusCode.NOT_FOUND and ( - str(update_err.value).lower() - == "workflow execution already completed" - ) + else: + with pytest.raises(RPCError) as update_err: + await update_task + assert update_err.value.status == RPCStatusCode.NOT_FOUND and ( + str(update_err.value).lower() + == "workflow execution already completed" + ) with pytest.raises(WorkflowFailureError) as err: await handle.result() From d4a5f5b57350da13c4529304830495ad8d63d4b4 Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Fri, 16 Aug 2024 18:54:03 -0400 Subject: [PATCH 5/7] Add coverage for dynamic handlers --- tests/worker/test_workflow.py | 207 +++++++++++++++++++++++++++++----- 1 file changed, 177 insertions(+), 30 deletions(-) diff --git a/tests/worker/test_workflow.py b/tests/worker/test_workflow.py index d75a4189a..6bbcb6099 100644 --- a/tests/worker/test_workflow.py +++ b/tests/worker/test_workflow.py @@ -5417,9 +5417,45 @@ async def run( "continue-as-new", "fail-post-continue-as-new-run", ], - wait_all_handlers_finished: bool, + handler_registration: Literal["late_registered", "not_late_registered"], + handler_dynamism: Literal["dynamic", "not_dynamic"], + handler_waiting: Literal[ + "wait_all_handlers_finish", "no_wait_all_handlers_finish" + ], ) -> NoReturn: - if wait_all_handlers_finished: + if handler_registration == "late_registered": + if handler_dynamism == "dynamic": + + async def my_late_registered_dynamic_update( + self, name: str, args: Sequence[RawValue] + ) -> str: + await workflow.wait_condition(lambda: self.handlers_may_finish) + return "my-late-registered-dynamic-update-result" + + async def my_late_registered_dynamic_signal( + self, name: str, args: Sequence[RawValue] + ) -> None: + await workflow.wait_condition(lambda: self.handlers_may_finish) + + workflow.set_dynamic_update_handler(my_late_registered_dynamic_update) + workflow.set_dynamic_signal_handler(my_late_registered_dynamic_signal) + else: + + async def my_late_registered_update(self) -> str: + await workflow.wait_condition(lambda: self.handlers_may_finish) + return "my-late-registered-update-result" + + async def my_late_registered_signal(self) -> None: + await workflow.wait_condition(lambda: self.handlers_may_finish) + + workflow.set_update_handler( + "my_late_registered_update", my_late_registered_update + ) + workflow.set_signal_handler( + "my_late_registered_signal", my_late_registered_signal + ) + + if handler_waiting == "wait_all_handlers_finish": self.handlers_may_finish = True await workflow.wait_condition(workflow.all_handlers_finished) if workflow_termination_type == "failure": @@ -5431,24 +5467,50 @@ async def run( elif workflow_termination_type == "continue-as-new": # Fail next run so that test terminates workflow.continue_as_new( - args=["fail-post-continue-as-new-run", wait_all_handlers_finished] + args=[ + "fail-post-continue-as-new-run", + handler_registration, + handler_dynamism, + handler_waiting, + ] ) else: await workflow.wait_condition(lambda: False) raise AssertionError("unreachable") @workflow.update - async def my_update(self) -> NoReturn: + async def my_update(self) -> str: await workflow.wait_condition(lambda: self.handlers_may_finish) + return "update-result" @workflow.signal - async def my_signal(self) -> NoReturn: + async def my_signal(self) -> None: await workflow.wait_condition(lambda: self.handlers_may_finish) + @workflow.update(dynamic=True) + async def my_dynamic_update(self, name: str, args: Sequence[RawValue]) -> str: + await workflow.wait_condition(lambda: self.handlers_may_finish) + return "my-dynamic-update-result" -@pytest.mark.parametrize("wait_all_handlers_finished", [True, False]) + @workflow.signal(dynamic=True) + async def my_dynamic_signal(self, name: str, args: Sequence[RawValue]) -> None: + await workflow.wait_condition(lambda: self.handlers_may_finish) + + +@pytest.mark.parametrize( + "handler_registration", ["late_registered", "not_late_registered"] +) +@pytest.mark.parametrize("handler_dynamism", ["dynamic", "not_dynamic"]) +@pytest.mark.parametrize( + "handler_waiting", + ["wait_all_handlers_finish", "no_wait_all_handlers_finish"], +) async def test_unfinished_update_handler_with_workflow_cancellation( - client: Client, env: WorkflowEnvironment, wait_all_handlers_finished: bool + client: Client, + env: WorkflowEnvironment, + handler_registration: Literal["late_registered", "not_late_registered"], + handler_dynamism: Literal["dynamic", "not_dynamic"], + handler_waiting: Literal["wait_all_handlers_finish", "no_wait_all_handlers_finish"], ): if env.supports_time_skipping: pytest.skip( @@ -5458,25 +5520,50 @@ async def test_unfinished_update_handler_with_workflow_cancellation( client, "update", "cancellation", - wait_all_handlers_finished, + handler_registration, + handler_dynamism, + handler_waiting, ).test_warning_is_issued_on_exit_with_unfinished_handler() -@pytest.mark.parametrize("wait_all_handlers_finished", [True, False]) +@pytest.mark.parametrize( + "handler_registration", ["late_registered", "not_late_registered"] +) +@pytest.mark.parametrize("handler_dynamism", ["dynamic", "not_dynamic"]) +@pytest.mark.parametrize( + "handler_waiting", + ["wait_all_handlers_finish", "no_wait_all_handlers_finish"], +) async def test_unfinished_signal_handler_with_workflow_cancellation( - client: Client, wait_all_handlers_finished: bool + client: Client, + handler_registration: Literal["late_registered", "not_late_registered"], + handler_dynamism: Literal["dynamic", "not_dynamic"], + handler_waiting: Literal["wait_all_handlers_finish", "no_wait_all_handlers_finish"], ): await _UnfinishedHandlersOnWorkflowTerminationTest( client, "signal", "cancellation", - wait_all_handlers_finished, + handler_registration, + handler_dynamism, + handler_waiting, ).test_warning_is_issued_on_exit_with_unfinished_handler() -@pytest.mark.parametrize("wait_all_handlers_finished", [True, False]) +@pytest.mark.parametrize( + "handler_registration", ["late_registered", "not_late_registered"] +) +@pytest.mark.parametrize("handler_dynamism", ["dynamic", "not_dynamic"]) +@pytest.mark.parametrize( + "handler_waiting", + ["wait_all_handlers_finish", "no_wait_all_handlers_finish"], +) async def test_unfinished_update_handler_with_workflow_failure( - client: Client, env: WorkflowEnvironment, wait_all_handlers_finished: bool + client: Client, + env: WorkflowEnvironment, + handler_registration: Literal["late_registered", "not_late_registered"], + handler_dynamism: Literal["dynamic", "not_dynamic"], + handler_waiting: Literal["wait_all_handlers_finish", "no_wait_all_handlers_finish"], ): if env.supports_time_skipping: pytest.skip( @@ -5486,13 +5573,26 @@ async def test_unfinished_update_handler_with_workflow_failure( client, "update", "failure", - wait_all_handlers_finished, + handler_registration, + handler_dynamism, + handler_waiting, ).test_warning_is_issued_on_exit_with_unfinished_handler() -@pytest.mark.parametrize("wait_all_handlers_finished", [True, False]) +@pytest.mark.parametrize( + "handler_registration", ["late_registered", "not_late_registered"] +) +@pytest.mark.parametrize("handler_dynamism", ["dynamic", "not_dynamic"]) +@pytest.mark.parametrize( + "handler_waiting", + ["wait_all_handlers_finish", "no_wait_all_handlers_finish"], +) async def test_unfinished_signal_handler_with_workflow_failure( - client: Client, env: WorkflowEnvironment, wait_all_handlers_finished: bool + client: Client, + env: WorkflowEnvironment, + handler_registration: Literal["late_registered", "not_late_registered"], + handler_dynamism: Literal["dynamic", "not_dynamic"], + handler_waiting: Literal["wait_all_handlers_finish", "no_wait_all_handlers_finish"], ): if env.supports_time_skipping: pytest.skip( @@ -5502,13 +5602,26 @@ async def test_unfinished_signal_handler_with_workflow_failure( client, "signal", "failure", - wait_all_handlers_finished, + handler_registration, + handler_dynamism, + handler_waiting, ).test_warning_is_issued_on_exit_with_unfinished_handler() -@pytest.mark.parametrize("wait_all_handlers_finished", [True, False]) +@pytest.mark.parametrize( + "handler_registration", ["late_registered", "not_late_registered"] +) +@pytest.mark.parametrize("handler_dynamism", ["dynamic", "not_dynamic"]) +@pytest.mark.parametrize( + "handler_waiting", + ["wait_all_handlers_finish", "no_wait_all_handlers_finish"], +) async def test_unfinished_update_handler_with_continue_as_new( - client: Client, env: WorkflowEnvironment, wait_all_handlers_finished: bool + client: Client, + env: WorkflowEnvironment, + handler_registration: Literal["late_registered", "not_late_registered"], + handler_dynamism: Literal["dynamic", "not_dynamic"], + handler_waiting: Literal["wait_all_handlers_finish", "no_wait_all_handlers_finish"], ): if env.supports_time_skipping: pytest.skip( @@ -5518,13 +5631,26 @@ async def test_unfinished_update_handler_with_continue_as_new( client, "update", "continue-as-new", - wait_all_handlers_finished, + handler_registration, + handler_dynamism, + handler_waiting, ).test_warning_is_issued_on_exit_with_unfinished_handler() -@pytest.mark.parametrize("wait_all_handlers_finished", [True, False]) +@pytest.mark.parametrize( + "handler_registration", ["late_registered", "not_late_registered"] +) +@pytest.mark.parametrize("handler_dynamism", ["dynamic", "not_dynamic"]) +@pytest.mark.parametrize( + "handler_waiting", + ["wait_all_handlers_finish", "no_wait_all_handlers_finish"], +) async def test_unfinished_signal_handler_with_continue_as_new( - client: Client, env: WorkflowEnvironment, wait_all_handlers_finished: bool + client: Client, + env: WorkflowEnvironment, + handler_registration: Literal["late_registered", "not_late_registered"], + handler_dynamism: Literal["dynamic", "not_dynamic"], + handler_waiting: Literal["wait_all_handlers_finish", "no_wait_all_handlers_finish"], ): if env.supports_time_skipping: pytest.skip( @@ -5534,7 +5660,9 @@ async def test_unfinished_signal_handler_with_continue_as_new( client, "signal", "continue-as-new", - wait_all_handlers_finished, + handler_registration, + handler_dynamism, + handler_waiting, ).test_warning_is_issued_on_exit_with_unfinished_handler() @@ -5543,13 +5671,15 @@ class _UnfinishedHandlersOnWorkflowTerminationTest: client: Client handler_type: Literal["update", "signal"] workflow_termination_type: Literal["cancellation", "failure", "continue-as-new"] - wait_all_handlers_finished: bool + handler_registration: Literal["late_registered", "not_late_registered"] + handler_dynamism: Literal["dynamic", "not_dynamic"] + handler_waiting: Literal["wait_all_handlers_finish", "no_wait_all_handlers_finish"] async def test_warning_is_issued_on_exit_with_unfinished_handler( self, ): assert await self._run_workflow_and_get_warning() == ( - not self.wait_all_handlers_finished + self.handler_waiting == "no_wait_all_handlers_finish" ) async def _run_workflow_and_get_warning(self) -> bool: @@ -5562,7 +5692,12 @@ async def _run_workflow_and_get_warning(self) -> bool: # server. handle = await self.client.start_workflow( UnfinishedHandlersOnWorkflowTerminationWorkflow.run, - args=[self.workflow_termination_type, self.wait_all_handlers_finished], + args=[ + self.workflow_termination_type, + self.handler_registration, + self.handler_dynamism, + self.handler_waiting, + ], id=workflow_id, task_queue=task_queue, ) @@ -5570,9 +5705,16 @@ async def _run_workflow_and_get_warning(self) -> bool: await handle.cancel() if self.handler_type == "update": + update_method = ( + "__does_not_exist__" + if self.handler_dynamism == "dynamic" + else "my_late_registered_update" + if self.handler_registration == "late_registered" + else UnfinishedHandlersOnWorkflowTerminationWorkflow.my_update + ) update_task = asyncio.create_task( handle.execute_update( - UnfinishedHandlersOnWorkflowTerminationWorkflow.my_update, + update_method, id=update_id, ) ) @@ -5581,9 +5723,14 @@ async def _run_workflow_and_get_warning(self) -> bool: lambda: workflow_update_exists(self.client, workflow_id, update_id), ) else: - await handle.signal( - UnfinishedHandlersOnWorkflowTerminationWorkflow.my_signal + signal_method = ( + "__does_not_exist__" + if self.handler_dynamism == "dynamic" + else "my_late_registered_signal" + if self.handler_registration == "late_registered" + else UnfinishedHandlersOnWorkflowTerminationWorkflow.my_signal ) + await handle.signal(signal_method) async with new_worker( self.client, @@ -5593,7 +5740,7 @@ async def _run_workflow_and_get_warning(self) -> bool: with pytest.WarningsRecorder() as warnings: if self.handler_type == "update": assert update_task - if self.wait_all_handlers_finished: + if self.handler_waiting == "wait_all_handlers_finish": await update_task else: with pytest.raises(RPCError) as update_err: From 5595afce86280d077c16d2009287bc06d52fae8b Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Fri, 16 Aug 2024 22:26:45 -0400 Subject: [PATCH 6/7] Use parametrize for all tests --- tests/worker/test_workflow.py | 242 ++++++++-------------------------- 1 file changed, 58 insertions(+), 184 deletions(-) diff --git a/tests/worker/test_workflow.py b/tests/worker/test_workflow.py index 6bbcb6099..d922bafa5 100644 --- a/tests/worker/test_workflow.py +++ b/tests/worker/test_workflow.py @@ -5412,19 +5412,19 @@ def __init__(self) -> None: async def run( self, workflow_termination_type: Literal[ - "cancellation", - "failure", - "continue-as-new", - "fail-post-continue-as-new-run", + "-cancellation-", + "-failure-", + "-continue-as-new-", + "-fail-post-continue-as-new-run-", ], - handler_registration: Literal["late_registered", "not_late_registered"], - handler_dynamism: Literal["dynamic", "not_dynamic"], + handler_registration: Literal["-late-registered-", "-not-late-registered-"], + handler_dynamism: Literal["-dynamic-", "-not-dynamic-"], handler_waiting: Literal[ - "wait_all_handlers_finish", "no_wait_all_handlers_finish" + "-wait-all-handlers-finish-", "-no-wait-all-handlers-finish-" ], ) -> NoReturn: - if handler_registration == "late_registered": - if handler_dynamism == "dynamic": + if handler_registration == "-late-registered-": + if handler_dynamism == "-dynamic-": async def my_late_registered_dynamic_update( self, name: str, args: Sequence[RawValue] @@ -5455,20 +5455,20 @@ async def my_late_registered_signal(self) -> None: "my_late_registered_signal", my_late_registered_signal ) - if handler_waiting == "wait_all_handlers_finish": + if handler_waiting == "-wait-all-handlers-finish-": self.handlers_may_finish = True await workflow.wait_condition(workflow.all_handlers_finished) - if workflow_termination_type == "failure": + if workflow_termination_type == "-failure-": raise ApplicationError( "Deliberately failing workflow with an unfinished handler" ) - elif workflow_termination_type == "fail-post-continue-as-new-run": + elif workflow_termination_type == "-fail-post-continue-as-new-run-": raise ApplicationError("Deliberately failing post-ContinueAsNew run") - elif workflow_termination_type == "continue-as-new": + elif workflow_termination_type == "-continue-as-new-": # Fail next run so that test terminates workflow.continue_as_new( args=[ - "fail-post-continue-as-new-run", + "-fail-post-continue-as-new-run-", handler_registration, handler_dynamism, handler_waiting, @@ -5497,169 +5497,39 @@ async def my_dynamic_signal(self, name: str, args: Sequence[RawValue]) -> None: await workflow.wait_condition(lambda: self.handlers_may_finish) +@pytest.mark.parametrize("handler_type", ["-signal-", "-update-"]) @pytest.mark.parametrize( - "handler_registration", ["late_registered", "not_late_registered"] + "handler_registration", ["-late-registered-", "-not-late-registered-"] ) -@pytest.mark.parametrize("handler_dynamism", ["dynamic", "not_dynamic"]) +@pytest.mark.parametrize("handler_dynamism", ["-dynamic-", "-not-dynamic-"]) @pytest.mark.parametrize( "handler_waiting", - ["wait_all_handlers_finish", "no_wait_all_handlers_finish"], + ["-wait-all-handlers-finish-", "-no-wait-all-handlers-finish-"], ) -async def test_unfinished_update_handler_with_workflow_cancellation( - client: Client, - env: WorkflowEnvironment, - handler_registration: Literal["late_registered", "not_late_registered"], - handler_dynamism: Literal["dynamic", "not_dynamic"], - handler_waiting: Literal["wait_all_handlers_finish", "no_wait_all_handlers_finish"], -): - if env.supports_time_skipping: - pytest.skip( - "Java test server: https://github.com/temporalio/sdk-java/issues/1903" - ) - await _UnfinishedHandlersOnWorkflowTerminationTest( - client, - "update", - "cancellation", - handler_registration, - handler_dynamism, - handler_waiting, - ).test_warning_is_issued_on_exit_with_unfinished_handler() - - -@pytest.mark.parametrize( - "handler_registration", ["late_registered", "not_late_registered"] -) -@pytest.mark.parametrize("handler_dynamism", ["dynamic", "not_dynamic"]) -@pytest.mark.parametrize( - "handler_waiting", - ["wait_all_handlers_finish", "no_wait_all_handlers_finish"], -) -async def test_unfinished_signal_handler_with_workflow_cancellation( - client: Client, - handler_registration: Literal["late_registered", "not_late_registered"], - handler_dynamism: Literal["dynamic", "not_dynamic"], - handler_waiting: Literal["wait_all_handlers_finish", "no_wait_all_handlers_finish"], -): - await _UnfinishedHandlersOnWorkflowTerminationTest( - client, - "signal", - "cancellation", - handler_registration, - handler_dynamism, - handler_waiting, - ).test_warning_is_issued_on_exit_with_unfinished_handler() - - @pytest.mark.parametrize( - "handler_registration", ["late_registered", "not_late_registered"] + "workflow_termination_type", ["-cancellation-", "-failure-", "-continue-as-new-"] ) -@pytest.mark.parametrize("handler_dynamism", ["dynamic", "not_dynamic"]) -@pytest.mark.parametrize( - "handler_waiting", - ["wait_all_handlers_finish", "no_wait_all_handlers_finish"], -) -async def test_unfinished_update_handler_with_workflow_failure( +async def test_unfinished_handler_on_workflow_termination( client: Client, env: WorkflowEnvironment, - handler_registration: Literal["late_registered", "not_late_registered"], - handler_dynamism: Literal["dynamic", "not_dynamic"], - handler_waiting: Literal["wait_all_handlers_finish", "no_wait_all_handlers_finish"], + handler_type: Literal["-signal-", "-update-"], + handler_registration: Literal["-late-registered-", "-not-late-registered-"], + handler_dynamism: Literal["-dynamic-", "-not-dynamic-"], + handler_waiting: Literal[ + "-wait-all-handlers-finish-", "-no-wait-all-handlers-finish-" + ], + workflow_termination_type: Literal[ + "-cancellation-", "-failure-", "-continue-as-new-" + ], ): - if env.supports_time_skipping: + if handler_type == "-update-" and env.supports_time_skipping: pytest.skip( "Java test server: https://github.com/temporalio/sdk-java/issues/1903" ) await _UnfinishedHandlersOnWorkflowTerminationTest( client, - "update", - "failure", - handler_registration, - handler_dynamism, - handler_waiting, - ).test_warning_is_issued_on_exit_with_unfinished_handler() - - -@pytest.mark.parametrize( - "handler_registration", ["late_registered", "not_late_registered"] -) -@pytest.mark.parametrize("handler_dynamism", ["dynamic", "not_dynamic"]) -@pytest.mark.parametrize( - "handler_waiting", - ["wait_all_handlers_finish", "no_wait_all_handlers_finish"], -) -async def test_unfinished_signal_handler_with_workflow_failure( - client: Client, - env: WorkflowEnvironment, - handler_registration: Literal["late_registered", "not_late_registered"], - handler_dynamism: Literal["dynamic", "not_dynamic"], - handler_waiting: Literal["wait_all_handlers_finish", "no_wait_all_handlers_finish"], -): - if env.supports_time_skipping: - pytest.skip( - "Java test server: https://github.com/temporalio/sdk-java/issues/2127" - ) - await _UnfinishedHandlersOnWorkflowTerminationTest( - client, - "signal", - "failure", - handler_registration, - handler_dynamism, - handler_waiting, - ).test_warning_is_issued_on_exit_with_unfinished_handler() - - -@pytest.mark.parametrize( - "handler_registration", ["late_registered", "not_late_registered"] -) -@pytest.mark.parametrize("handler_dynamism", ["dynamic", "not_dynamic"]) -@pytest.mark.parametrize( - "handler_waiting", - ["wait_all_handlers_finish", "no_wait_all_handlers_finish"], -) -async def test_unfinished_update_handler_with_continue_as_new( - client: Client, - env: WorkflowEnvironment, - handler_registration: Literal["late_registered", "not_late_registered"], - handler_dynamism: Literal["dynamic", "not_dynamic"], - handler_waiting: Literal["wait_all_handlers_finish", "no_wait_all_handlers_finish"], -): - if env.supports_time_skipping: - pytest.skip( - "Java test server: https://github.com/temporalio/sdk-java/issues/1903" - ) - await _UnfinishedHandlersOnWorkflowTerminationTest( - client, - "update", - "continue-as-new", - handler_registration, - handler_dynamism, - handler_waiting, - ).test_warning_is_issued_on_exit_with_unfinished_handler() - - -@pytest.mark.parametrize( - "handler_registration", ["late_registered", "not_late_registered"] -) -@pytest.mark.parametrize("handler_dynamism", ["dynamic", "not_dynamic"]) -@pytest.mark.parametrize( - "handler_waiting", - ["wait_all_handlers_finish", "no_wait_all_handlers_finish"], -) -async def test_unfinished_signal_handler_with_continue_as_new( - client: Client, - env: WorkflowEnvironment, - handler_registration: Literal["late_registered", "not_late_registered"], - handler_dynamism: Literal["dynamic", "not_dynamic"], - handler_waiting: Literal["wait_all_handlers_finish", "no_wait_all_handlers_finish"], -): - if env.supports_time_skipping: - pytest.skip( - "Java test server: https://github.com/temporalio/sdk-java/issues/2127" - ) - await _UnfinishedHandlersOnWorkflowTerminationTest( - client, - "signal", - "continue-as-new", + handler_type, + workflow_termination_type, handler_registration, handler_dynamism, handler_waiting, @@ -5669,17 +5539,21 @@ async def test_unfinished_signal_handler_with_continue_as_new( @dataclass class _UnfinishedHandlersOnWorkflowTerminationTest: client: Client - handler_type: Literal["update", "signal"] - workflow_termination_type: Literal["cancellation", "failure", "continue-as-new"] - handler_registration: Literal["late_registered", "not_late_registered"] - handler_dynamism: Literal["dynamic", "not_dynamic"] - handler_waiting: Literal["wait_all_handlers_finish", "no_wait_all_handlers_finish"] + handler_type: Literal["-signal-", "-update-"] + workflow_termination_type: Literal[ + "-cancellation-", "-failure-", "-continue-as-new-" + ] + handler_registration: Literal["-late-registered-", "-not-late-registered-"] + handler_dynamism: Literal["-dynamic-", "-not-dynamic-"] + handler_waiting: Literal[ + "-wait-all-handlers-finish-", "-no-wait-all-handlers-finish-" + ] async def test_warning_is_issued_on_exit_with_unfinished_handler( self, ): assert await self._run_workflow_and_get_warning() == ( - self.handler_waiting == "no_wait_all_handlers_finish" + self.handler_waiting == "-no-wait-all-handlers-finish-" ) async def _run_workflow_and_get_warning(self) -> bool: @@ -5701,20 +5575,20 @@ async def _run_workflow_and_get_warning(self) -> bool: id=workflow_id, task_queue=task_queue, ) - if self.workflow_termination_type == "cancellation": + if self.workflow_termination_type == "-cancellation-": await handle.cancel() - if self.handler_type == "update": + if self.handler_type == "-update-": update_method = ( "__does_not_exist__" - if self.handler_dynamism == "dynamic" + if self.handler_dynamism == "-dynamic-" else "my_late_registered_update" - if self.handler_registration == "late_registered" + if self.handler_registration == "-late-registered-" else UnfinishedHandlersOnWorkflowTerminationWorkflow.my_update ) update_task = asyncio.create_task( handle.execute_update( - update_method, + update_method, # type: ignore id=update_id, ) ) @@ -5725,12 +5599,12 @@ async def _run_workflow_and_get_warning(self) -> bool: else: signal_method = ( "__does_not_exist__" - if self.handler_dynamism == "dynamic" + if self.handler_dynamism == "-dynamic-" else "my_late_registered_signal" - if self.handler_registration == "late_registered" + if self.handler_registration == "-late-registered-" else UnfinishedHandlersOnWorkflowTerminationWorkflow.my_signal ) - await handle.signal(signal_method) + await handle.signal(signal_method) # type: ignore async with new_worker( self.client, @@ -5738,9 +5612,9 @@ async def _run_workflow_and_get_warning(self) -> bool: task_queue=task_queue, ): with pytest.WarningsRecorder() as warnings: - if self.handler_type == "update": + if self.handler_type == "-update-": assert update_task - if self.handler_waiting == "wait_all_handlers_finish": + if self.handler_waiting == "-wait-all-handlers-finish-": await update_task else: with pytest.raises(RPCError) as update_err: @@ -5755,12 +5629,12 @@ async def _run_workflow_and_get_warning(self) -> bool: assert isinstance( err.value.cause, { - "cancellation": CancelledError, - "continue-as-new": ApplicationError, - "failure": ApplicationError, + "-cancellation-": CancelledError, + "-continue-as-new-": ApplicationError, + "-failure-": ApplicationError, }[self.workflow_termination_type], ) - if self.workflow_termination_type == "continue-as-new": + if self.workflow_termination_type == "-continue-as-new-": assert ( str(err.value.cause) == "Deliberately failing post-ContinueAsNew run" @@ -5775,8 +5649,8 @@ async def _run_workflow_and_get_warning(self) -> bool: @property def _unfinished_handler_warning_cls(self) -> Type: return { - "update": workflow.UnfinishedUpdateHandlersWarning, - "signal": workflow.UnfinishedSignalHandlersWarning, + "-update-": workflow.UnfinishedUpdateHandlersWarning, + "-signal-": workflow.UnfinishedSignalHandlersWarning, }[self.handler_type] From 5144f056ca0e40fa29798c10afb2d010fb77f114 Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Mon, 19 Aug 2024 07:14:40 -0400 Subject: [PATCH 7/7] Include #596 add-rule-to-warning --- temporalio/worker/_workflow_instance.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/temporalio/worker/_workflow_instance.py b/temporalio/worker/_workflow_instance.py index 7fa3cded5..a5e5b63ed 100644 --- a/temporalio/worker/_workflow_instance.py +++ b/temporalio/worker/_workflow_instance.py @@ -2801,8 +2801,8 @@ def _make_unfinished_signal_handler_message( signal handler was doing. You can wait for all update and signal handlers to complete by using `await workflow.wait_condition(lambda: workflow.all_handlers_finished())`. Alternatively, if both you and the clients sending the signal are okay with interrupting running handlers when the workflow -finishes, and causing clients to receive errors, then you can disable this warning via the signal -handler decorator: `@workflow.signal(unfinished_policy=workflow.HandlerUnfinishedPolicy.ABANDON)`. +finishes, then you can disable this warning via the signal handler decorator: +`@workflow.signal(unfinished_policy=workflow.HandlerUnfinishedPolicy.ABANDON)`. """.replace("\n", " ").strip() names = collections.Counter(ex.name for ex in handler_executions) return (