diff --git a/temporalio/client.py b/temporalio/client.py index e6c66ddfb..8b203207a 100644 --- a/temporalio/client.py +++ b/temporalio/client.py @@ -298,6 +298,7 @@ async def start_workflow( run_timeout: Optional[timedelta] = None, task_timeout: Optional[timedelta] = None, id_reuse_policy: temporalio.common.WorkflowIDReusePolicy = temporalio.common.WorkflowIDReusePolicy.ALLOW_DUPLICATE, + id_conflict_policy: temporalio.common.WorkflowIDConflictPolicy = temporalio.common.WorkflowIDConflictPolicy.UNSPECIFIED, retry_policy: Optional[temporalio.common.RetryPolicy] = None, cron_schedule: str = "", memo: Optional[Mapping[str, Any]] = None, @@ -328,6 +329,7 @@ async def start_workflow( run_timeout: Optional[timedelta] = None, task_timeout: Optional[timedelta] = None, id_reuse_policy: temporalio.common.WorkflowIDReusePolicy = temporalio.common.WorkflowIDReusePolicy.ALLOW_DUPLICATE, + id_conflict_policy: temporalio.common.WorkflowIDConflictPolicy = temporalio.common.WorkflowIDConflictPolicy.UNSPECIFIED, retry_policy: Optional[temporalio.common.RetryPolicy] = None, cron_schedule: str = "", memo: Optional[Mapping[str, Any]] = None, @@ -360,6 +362,7 @@ async def start_workflow( run_timeout: Optional[timedelta] = None, task_timeout: Optional[timedelta] = None, id_reuse_policy: temporalio.common.WorkflowIDReusePolicy = temporalio.common.WorkflowIDReusePolicy.ALLOW_DUPLICATE, + id_conflict_policy: temporalio.common.WorkflowIDConflictPolicy = temporalio.common.WorkflowIDConflictPolicy.UNSPECIFIED, retry_policy: Optional[temporalio.common.RetryPolicy] = None, cron_schedule: str = "", memo: Optional[Mapping[str, Any]] = None, @@ -392,6 +395,7 @@ async def start_workflow( run_timeout: Optional[timedelta] = None, task_timeout: Optional[timedelta] = None, id_reuse_policy: temporalio.common.WorkflowIDReusePolicy = temporalio.common.WorkflowIDReusePolicy.ALLOW_DUPLICATE, + id_conflict_policy: temporalio.common.WorkflowIDConflictPolicy = temporalio.common.WorkflowIDConflictPolicy.UNSPECIFIED, retry_policy: Optional[temporalio.common.RetryPolicy] = None, cron_schedule: str = "", memo: Optional[Mapping[str, Any]] = None, @@ -422,6 +426,7 @@ async def start_workflow( run_timeout: Optional[timedelta] = None, task_timeout: Optional[timedelta] = None, id_reuse_policy: temporalio.common.WorkflowIDReusePolicy = temporalio.common.WorkflowIDReusePolicy.ALLOW_DUPLICATE, + id_conflict_policy: temporalio.common.WorkflowIDConflictPolicy = temporalio.common.WorkflowIDConflictPolicy.UNSPECIFIED, retry_policy: Optional[temporalio.common.RetryPolicy] = None, cron_schedule: str = "", memo: Optional[Mapping[str, Any]] = None, @@ -455,6 +460,10 @@ async def start_workflow( run_timeout: Timeout of a single workflow run. task_timeout: Timeout of a single workflow task. id_reuse_policy: How already-existing IDs are treated. + id_conflict_policy: How already-running workflows of the same ID are + treated. Default is unspecified which effectively means fail the + start attempt. This cannot be set if ``id_reuse_policy`` is set + to terminate if running. retry_policy: Retry policy for the workflow. cron_schedule: See https://docs.temporal.io/docs/content/what-is-a-temporal-cron-job/ memo: Memo for the workflow. @@ -510,6 +519,7 @@ async def start_workflow( run_timeout=run_timeout, task_timeout=task_timeout, id_reuse_policy=id_reuse_policy, + id_conflict_policy=id_conflict_policy, retry_policy=retry_policy, cron_schedule=cron_schedule, memo=memo, @@ -537,6 +547,7 @@ async def execute_workflow( run_timeout: Optional[timedelta] = None, task_timeout: Optional[timedelta] = None, id_reuse_policy: temporalio.common.WorkflowIDReusePolicy = temporalio.common.WorkflowIDReusePolicy.ALLOW_DUPLICATE, + id_conflict_policy: temporalio.common.WorkflowIDConflictPolicy = temporalio.common.WorkflowIDConflictPolicy.UNSPECIFIED, retry_policy: Optional[temporalio.common.RetryPolicy] = None, cron_schedule: str = "", memo: Optional[Mapping[str, Any]] = None, @@ -567,6 +578,7 @@ async def execute_workflow( run_timeout: Optional[timedelta] = None, task_timeout: Optional[timedelta] = None, id_reuse_policy: temporalio.common.WorkflowIDReusePolicy = temporalio.common.WorkflowIDReusePolicy.ALLOW_DUPLICATE, + id_conflict_policy: temporalio.common.WorkflowIDConflictPolicy = temporalio.common.WorkflowIDConflictPolicy.UNSPECIFIED, retry_policy: Optional[temporalio.common.RetryPolicy] = None, cron_schedule: str = "", memo: Optional[Mapping[str, Any]] = None, @@ -599,6 +611,7 @@ async def execute_workflow( run_timeout: Optional[timedelta] = None, task_timeout: Optional[timedelta] = None, id_reuse_policy: temporalio.common.WorkflowIDReusePolicy = temporalio.common.WorkflowIDReusePolicy.ALLOW_DUPLICATE, + id_conflict_policy: temporalio.common.WorkflowIDConflictPolicy = temporalio.common.WorkflowIDConflictPolicy.UNSPECIFIED, retry_policy: Optional[temporalio.common.RetryPolicy] = None, cron_schedule: str = "", memo: Optional[Mapping[str, Any]] = None, @@ -631,6 +644,7 @@ async def execute_workflow( run_timeout: Optional[timedelta] = None, task_timeout: Optional[timedelta] = None, id_reuse_policy: temporalio.common.WorkflowIDReusePolicy = temporalio.common.WorkflowIDReusePolicy.ALLOW_DUPLICATE, + id_conflict_policy: temporalio.common.WorkflowIDConflictPolicy = temporalio.common.WorkflowIDConflictPolicy.UNSPECIFIED, retry_policy: Optional[temporalio.common.RetryPolicy] = None, cron_schedule: str = "", memo: Optional[Mapping[str, Any]] = None, @@ -661,6 +675,7 @@ async def execute_workflow( run_timeout: Optional[timedelta] = None, task_timeout: Optional[timedelta] = None, id_reuse_policy: temporalio.common.WorkflowIDReusePolicy = temporalio.common.WorkflowIDReusePolicy.ALLOW_DUPLICATE, + id_conflict_policy: temporalio.common.WorkflowIDConflictPolicy = temporalio.common.WorkflowIDConflictPolicy.UNSPECIFIED, retry_policy: Optional[temporalio.common.RetryPolicy] = None, cron_schedule: str = "", memo: Optional[Mapping[str, Any]] = None, @@ -696,6 +711,7 @@ async def execute_workflow( run_timeout=run_timeout, task_timeout=task_timeout, id_reuse_policy=id_reuse_policy, + id_conflict_policy=id_conflict_policy, retry_policy=retry_policy, cron_schedule=cron_schedule, memo=memo, @@ -4487,6 +4503,7 @@ class StartWorkflowInput: run_timeout: Optional[timedelta] task_timeout: Optional[timedelta] id_reuse_policy: temporalio.common.WorkflowIDReusePolicy + id_conflict_policy: temporalio.common.WorkflowIDConflictPolicy retry_policy: Optional[temporalio.common.RetryPolicy] cron_schedule: str memo: Optional[Mapping[str, Any]] @@ -5008,6 +5025,10 @@ async def start_workflow( "temporalio.api.enums.v1.WorkflowIdReusePolicy.ValueType", int(input.id_reuse_policy), ) + req.workflow_id_conflict_policy = cast( + "temporalio.api.enums.v1.WorkflowIdConflictPolicy.ValueType", + int(input.id_conflict_policy), + ) if input.retry_policy is not None: input.retry_policy.apply_to_proto(req.retry_policy) req.cron_schedule = input.cron_schedule diff --git a/temporalio/common.py b/temporalio/common.py index 428966e72..e073aaecd 100644 --- a/temporalio/common.py +++ b/temporalio/common.py @@ -132,6 +132,26 @@ class WorkflowIDReusePolicy(IntEnum): ) +class WorkflowIDConflictPolicy(IntEnum): + """How already-running workflows of the same ID are handled on start. + + See :py:class:`temporalio.api.enums.v1.WorkflowIdConflictPolicy`. + """ + + UNSPECIFIED = int( + temporalio.api.enums.v1.WorkflowIdConflictPolicy.WORKFLOW_ID_CONFLICT_POLICY_UNSPECIFIED + ) + FAIL = int( + temporalio.api.enums.v1.WorkflowIdConflictPolicy.WORKFLOW_ID_CONFLICT_POLICY_FAIL + ) + USE_EXISTING = int( + temporalio.api.enums.v1.WorkflowIdConflictPolicy.WORKFLOW_ID_CONFLICT_POLICY_USE_EXISTING + ) + TERMINATE_EXISTING = int( + temporalio.api.enums.v1.WorkflowIdConflictPolicy.WORKFLOW_ID_CONFLICT_POLICY_TERMINATE_EXISTING + ) + + class QueryRejectCondition(IntEnum): """Whether a query should be rejected in certain conditions. diff --git a/tests/worker/test_workflow.py b/tests/worker/test_workflow.py index fcd957ae1..30b52fbec 100644 --- a/tests/worker/test_workflow.py +++ b/tests/worker/test_workflow.py @@ -69,6 +69,7 @@ SearchAttributes, SearchAttributeValues, TypedSearchAttributes, + WorkflowIDConflictPolicy, ) from temporalio.converter import ( DataConverter, @@ -5505,3 +5506,69 @@ def _unfinished_handler_warning_cls(self) -> Type: "update": workflow.UnfinishedUpdateHandlersWarning, "signal": workflow.UnfinishedSignalHandlersWarning, }[self.handler_type] + + +@workflow.defn +class IDConflictWorkflow: + # Just run forever + @workflow.run + async def run(self) -> None: + await workflow.wait_condition(lambda: False) + + +async def test_workflow_id_conflict(client: Client): + async with new_worker(client, IDConflictWorkflow) as worker: + # Start a workflow + handle = await client.start_workflow( + IDConflictWorkflow.run, + id=f"wf-{uuid.uuid4()}", + task_queue=worker.task_queue, + ) + handle = client.get_workflow_handle_for( + IDConflictWorkflow.run, handle.id, run_id=handle.result_run_id + ) + + # Confirm another fails by default + with pytest.raises(WorkflowAlreadyStartedError): + await client.start_workflow( + IDConflictWorkflow.run, + id=handle.id, + task_queue=worker.task_queue, + ) + + # Confirm fails if explicitly given that option + with pytest.raises(WorkflowAlreadyStartedError): + await client.start_workflow( + IDConflictWorkflow.run, + id=handle.id, + task_queue=worker.task_queue, + id_conflict_policy=WorkflowIDConflictPolicy.FAIL, + ) + + # Confirm gives back same handle if requested + new_handle = await client.start_workflow( + IDConflictWorkflow.run, + id=handle.id, + task_queue=worker.task_queue, + id_conflict_policy=WorkflowIDConflictPolicy.USE_EXISTING, + ) + new_handle = client.get_workflow_handle_for( + IDConflictWorkflow.run, new_handle.id, run_id=new_handle.result_run_id + ) + assert new_handle.run_id == handle.run_id + assert (await handle.describe()).status == WorkflowExecutionStatus.RUNNING + assert (await new_handle.describe()).status == WorkflowExecutionStatus.RUNNING + + # Confirm terminates and starts new if requested + new_handle = await client.start_workflow( + IDConflictWorkflow.run, + id=handle.id, + task_queue=worker.task_queue, + id_conflict_policy=WorkflowIDConflictPolicy.TERMINATE_EXISTING, + ) + new_handle = client.get_workflow_handle_for( + IDConflictWorkflow.run, new_handle.id, run_id=new_handle.result_run_id + ) + assert new_handle.run_id != handle.run_id + assert (await handle.describe()).status == WorkflowExecutionStatus.TERMINATED + assert (await new_handle.describe()).status == WorkflowExecutionStatus.RUNNING