Skip to content

Commit c57df81

Browse files
authored
Support for workflow ID conflict policy (#579)
Fixes #504
1 parent bcbacc2 commit c57df81

File tree

3 files changed

+108
-0
lines changed

3 files changed

+108
-0
lines changed

temporalio/client.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -298,6 +298,7 @@ async def start_workflow(
298298
run_timeout: Optional[timedelta] = None,
299299
task_timeout: Optional[timedelta] = None,
300300
id_reuse_policy: temporalio.common.WorkflowIDReusePolicy = temporalio.common.WorkflowIDReusePolicy.ALLOW_DUPLICATE,
301+
id_conflict_policy: temporalio.common.WorkflowIDConflictPolicy = temporalio.common.WorkflowIDConflictPolicy.UNSPECIFIED,
301302
retry_policy: Optional[temporalio.common.RetryPolicy] = None,
302303
cron_schedule: str = "",
303304
memo: Optional[Mapping[str, Any]] = None,
@@ -328,6 +329,7 @@ async def start_workflow(
328329
run_timeout: Optional[timedelta] = None,
329330
task_timeout: Optional[timedelta] = None,
330331
id_reuse_policy: temporalio.common.WorkflowIDReusePolicy = temporalio.common.WorkflowIDReusePolicy.ALLOW_DUPLICATE,
332+
id_conflict_policy: temporalio.common.WorkflowIDConflictPolicy = temporalio.common.WorkflowIDConflictPolicy.UNSPECIFIED,
331333
retry_policy: Optional[temporalio.common.RetryPolicy] = None,
332334
cron_schedule: str = "",
333335
memo: Optional[Mapping[str, Any]] = None,
@@ -360,6 +362,7 @@ async def start_workflow(
360362
run_timeout: Optional[timedelta] = None,
361363
task_timeout: Optional[timedelta] = None,
362364
id_reuse_policy: temporalio.common.WorkflowIDReusePolicy = temporalio.common.WorkflowIDReusePolicy.ALLOW_DUPLICATE,
365+
id_conflict_policy: temporalio.common.WorkflowIDConflictPolicy = temporalio.common.WorkflowIDConflictPolicy.UNSPECIFIED,
363366
retry_policy: Optional[temporalio.common.RetryPolicy] = None,
364367
cron_schedule: str = "",
365368
memo: Optional[Mapping[str, Any]] = None,
@@ -392,6 +395,7 @@ async def start_workflow(
392395
run_timeout: Optional[timedelta] = None,
393396
task_timeout: Optional[timedelta] = None,
394397
id_reuse_policy: temporalio.common.WorkflowIDReusePolicy = temporalio.common.WorkflowIDReusePolicy.ALLOW_DUPLICATE,
398+
id_conflict_policy: temporalio.common.WorkflowIDConflictPolicy = temporalio.common.WorkflowIDConflictPolicy.UNSPECIFIED,
395399
retry_policy: Optional[temporalio.common.RetryPolicy] = None,
396400
cron_schedule: str = "",
397401
memo: Optional[Mapping[str, Any]] = None,
@@ -422,6 +426,7 @@ async def start_workflow(
422426
run_timeout: Optional[timedelta] = None,
423427
task_timeout: Optional[timedelta] = None,
424428
id_reuse_policy: temporalio.common.WorkflowIDReusePolicy = temporalio.common.WorkflowIDReusePolicy.ALLOW_DUPLICATE,
429+
id_conflict_policy: temporalio.common.WorkflowIDConflictPolicy = temporalio.common.WorkflowIDConflictPolicy.UNSPECIFIED,
425430
retry_policy: Optional[temporalio.common.RetryPolicy] = None,
426431
cron_schedule: str = "",
427432
memo: Optional[Mapping[str, Any]] = None,
@@ -455,6 +460,10 @@ async def start_workflow(
455460
run_timeout: Timeout of a single workflow run.
456461
task_timeout: Timeout of a single workflow task.
457462
id_reuse_policy: How already-existing IDs are treated.
463+
id_conflict_policy: How already-running workflows of the same ID are
464+
treated. Default is unspecified which effectively means fail the
465+
start attempt. This cannot be set if ``id_reuse_policy`` is set
466+
to terminate if running.
458467
retry_policy: Retry policy for the workflow.
459468
cron_schedule: See https://docs.temporal.io/docs/content/what-is-a-temporal-cron-job/
460469
memo: Memo for the workflow.
@@ -510,6 +519,7 @@ async def start_workflow(
510519
run_timeout=run_timeout,
511520
task_timeout=task_timeout,
512521
id_reuse_policy=id_reuse_policy,
522+
id_conflict_policy=id_conflict_policy,
513523
retry_policy=retry_policy,
514524
cron_schedule=cron_schedule,
515525
memo=memo,
@@ -537,6 +547,7 @@ async def execute_workflow(
537547
run_timeout: Optional[timedelta] = None,
538548
task_timeout: Optional[timedelta] = None,
539549
id_reuse_policy: temporalio.common.WorkflowIDReusePolicy = temporalio.common.WorkflowIDReusePolicy.ALLOW_DUPLICATE,
550+
id_conflict_policy: temporalio.common.WorkflowIDConflictPolicy = temporalio.common.WorkflowIDConflictPolicy.UNSPECIFIED,
540551
retry_policy: Optional[temporalio.common.RetryPolicy] = None,
541552
cron_schedule: str = "",
542553
memo: Optional[Mapping[str, Any]] = None,
@@ -567,6 +578,7 @@ async def execute_workflow(
567578
run_timeout: Optional[timedelta] = None,
568579
task_timeout: Optional[timedelta] = None,
569580
id_reuse_policy: temporalio.common.WorkflowIDReusePolicy = temporalio.common.WorkflowIDReusePolicy.ALLOW_DUPLICATE,
581+
id_conflict_policy: temporalio.common.WorkflowIDConflictPolicy = temporalio.common.WorkflowIDConflictPolicy.UNSPECIFIED,
570582
retry_policy: Optional[temporalio.common.RetryPolicy] = None,
571583
cron_schedule: str = "",
572584
memo: Optional[Mapping[str, Any]] = None,
@@ -599,6 +611,7 @@ async def execute_workflow(
599611
run_timeout: Optional[timedelta] = None,
600612
task_timeout: Optional[timedelta] = None,
601613
id_reuse_policy: temporalio.common.WorkflowIDReusePolicy = temporalio.common.WorkflowIDReusePolicy.ALLOW_DUPLICATE,
614+
id_conflict_policy: temporalio.common.WorkflowIDConflictPolicy = temporalio.common.WorkflowIDConflictPolicy.UNSPECIFIED,
602615
retry_policy: Optional[temporalio.common.RetryPolicy] = None,
603616
cron_schedule: str = "",
604617
memo: Optional[Mapping[str, Any]] = None,
@@ -631,6 +644,7 @@ async def execute_workflow(
631644
run_timeout: Optional[timedelta] = None,
632645
task_timeout: Optional[timedelta] = None,
633646
id_reuse_policy: temporalio.common.WorkflowIDReusePolicy = temporalio.common.WorkflowIDReusePolicy.ALLOW_DUPLICATE,
647+
id_conflict_policy: temporalio.common.WorkflowIDConflictPolicy = temporalio.common.WorkflowIDConflictPolicy.UNSPECIFIED,
634648
retry_policy: Optional[temporalio.common.RetryPolicy] = None,
635649
cron_schedule: str = "",
636650
memo: Optional[Mapping[str, Any]] = None,
@@ -661,6 +675,7 @@ async def execute_workflow(
661675
run_timeout: Optional[timedelta] = None,
662676
task_timeout: Optional[timedelta] = None,
663677
id_reuse_policy: temporalio.common.WorkflowIDReusePolicy = temporalio.common.WorkflowIDReusePolicy.ALLOW_DUPLICATE,
678+
id_conflict_policy: temporalio.common.WorkflowIDConflictPolicy = temporalio.common.WorkflowIDConflictPolicy.UNSPECIFIED,
664679
retry_policy: Optional[temporalio.common.RetryPolicy] = None,
665680
cron_schedule: str = "",
666681
memo: Optional[Mapping[str, Any]] = None,
@@ -696,6 +711,7 @@ async def execute_workflow(
696711
run_timeout=run_timeout,
697712
task_timeout=task_timeout,
698713
id_reuse_policy=id_reuse_policy,
714+
id_conflict_policy=id_conflict_policy,
699715
retry_policy=retry_policy,
700716
cron_schedule=cron_schedule,
701717
memo=memo,
@@ -4487,6 +4503,7 @@ class StartWorkflowInput:
44874503
run_timeout: Optional[timedelta]
44884504
task_timeout: Optional[timedelta]
44894505
id_reuse_policy: temporalio.common.WorkflowIDReusePolicy
4506+
id_conflict_policy: temporalio.common.WorkflowIDConflictPolicy
44904507
retry_policy: Optional[temporalio.common.RetryPolicy]
44914508
cron_schedule: str
44924509
memo: Optional[Mapping[str, Any]]
@@ -5008,6 +5025,10 @@ async def start_workflow(
50085025
"temporalio.api.enums.v1.WorkflowIdReusePolicy.ValueType",
50095026
int(input.id_reuse_policy),
50105027
)
5028+
req.workflow_id_conflict_policy = cast(
5029+
"temporalio.api.enums.v1.WorkflowIdConflictPolicy.ValueType",
5030+
int(input.id_conflict_policy),
5031+
)
50115032
if input.retry_policy is not None:
50125033
input.retry_policy.apply_to_proto(req.retry_policy)
50135034
req.cron_schedule = input.cron_schedule

temporalio/common.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,26 @@ class WorkflowIDReusePolicy(IntEnum):
132132
)
133133

134134

135+
class WorkflowIDConflictPolicy(IntEnum):
136+
"""How already-running workflows of the same ID are handled on start.
137+
138+
See :py:class:`temporalio.api.enums.v1.WorkflowIdConflictPolicy`.
139+
"""
140+
141+
UNSPECIFIED = int(
142+
temporalio.api.enums.v1.WorkflowIdConflictPolicy.WORKFLOW_ID_CONFLICT_POLICY_UNSPECIFIED
143+
)
144+
FAIL = int(
145+
temporalio.api.enums.v1.WorkflowIdConflictPolicy.WORKFLOW_ID_CONFLICT_POLICY_FAIL
146+
)
147+
USE_EXISTING = int(
148+
temporalio.api.enums.v1.WorkflowIdConflictPolicy.WORKFLOW_ID_CONFLICT_POLICY_USE_EXISTING
149+
)
150+
TERMINATE_EXISTING = int(
151+
temporalio.api.enums.v1.WorkflowIdConflictPolicy.WORKFLOW_ID_CONFLICT_POLICY_TERMINATE_EXISTING
152+
)
153+
154+
135155
class QueryRejectCondition(IntEnum):
136156
"""Whether a query should be rejected in certain conditions.
137157

tests/worker/test_workflow.py

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@
6969
SearchAttributes,
7070
SearchAttributeValues,
7171
TypedSearchAttributes,
72+
WorkflowIDConflictPolicy,
7273
)
7374
from temporalio.converter import (
7475
DataConverter,
@@ -5505,3 +5506,69 @@ def _unfinished_handler_warning_cls(self) -> Type:
55055506
"update": workflow.UnfinishedUpdateHandlersWarning,
55065507
"signal": workflow.UnfinishedSignalHandlersWarning,
55075508
}[self.handler_type]
5509+
5510+
5511+
@workflow.defn
5512+
class IDConflictWorkflow:
5513+
# Just run forever
5514+
@workflow.run
5515+
async def run(self) -> None:
5516+
await workflow.wait_condition(lambda: False)
5517+
5518+
5519+
async def test_workflow_id_conflict(client: Client):
5520+
async with new_worker(client, IDConflictWorkflow) as worker:
5521+
# Start a workflow
5522+
handle = await client.start_workflow(
5523+
IDConflictWorkflow.run,
5524+
id=f"wf-{uuid.uuid4()}",
5525+
task_queue=worker.task_queue,
5526+
)
5527+
handle = client.get_workflow_handle_for(
5528+
IDConflictWorkflow.run, handle.id, run_id=handle.result_run_id
5529+
)
5530+
5531+
# Confirm another fails by default
5532+
with pytest.raises(WorkflowAlreadyStartedError):
5533+
await client.start_workflow(
5534+
IDConflictWorkflow.run,
5535+
id=handle.id,
5536+
task_queue=worker.task_queue,
5537+
)
5538+
5539+
# Confirm fails if explicitly given that option
5540+
with pytest.raises(WorkflowAlreadyStartedError):
5541+
await client.start_workflow(
5542+
IDConflictWorkflow.run,
5543+
id=handle.id,
5544+
task_queue=worker.task_queue,
5545+
id_conflict_policy=WorkflowIDConflictPolicy.FAIL,
5546+
)
5547+
5548+
# Confirm gives back same handle if requested
5549+
new_handle = await client.start_workflow(
5550+
IDConflictWorkflow.run,
5551+
id=handle.id,
5552+
task_queue=worker.task_queue,
5553+
id_conflict_policy=WorkflowIDConflictPolicy.USE_EXISTING,
5554+
)
5555+
new_handle = client.get_workflow_handle_for(
5556+
IDConflictWorkflow.run, new_handle.id, run_id=new_handle.result_run_id
5557+
)
5558+
assert new_handle.run_id == handle.run_id
5559+
assert (await handle.describe()).status == WorkflowExecutionStatus.RUNNING
5560+
assert (await new_handle.describe()).status == WorkflowExecutionStatus.RUNNING
5561+
5562+
# Confirm terminates and starts new if requested
5563+
new_handle = await client.start_workflow(
5564+
IDConflictWorkflow.run,
5565+
id=handle.id,
5566+
task_queue=worker.task_queue,
5567+
id_conflict_policy=WorkflowIDConflictPolicy.TERMINATE_EXISTING,
5568+
)
5569+
new_handle = client.get_workflow_handle_for(
5570+
IDConflictWorkflow.run, new_handle.id, run_id=new_handle.result_run_id
5571+
)
5572+
assert new_handle.run_id != handle.run_id
5573+
assert (await handle.describe()).status == WorkflowExecutionStatus.TERMINATED
5574+
assert (await new_handle.describe()).status == WorkflowExecutionStatus.RUNNING

0 commit comments

Comments
 (0)