diff --git a/temporalio/worker/_workflow.py b/temporalio/worker/_workflow.py index 914ce1946..6606c6b60 100644 --- a/temporalio/worker/_workflow.py +++ b/temporalio/worker/_workflow.py @@ -136,13 +136,18 @@ def __init__( if defn.name in self._workflows: raise ValueError(f"More than one workflow named {defn.name}") if should_enforce_versioning_behavior: - if defn.versioning_behavior in [ - None, - temporalio.common.VersioningBehavior.UNSPECIFIED, - ]: + if ( + defn.versioning_behavior + in [ + None, + temporalio.common.VersioningBehavior.UNSPECIFIED, + ] + and not defn.dynamic_config_fn + ): raise ValueError( f"Workflow {defn.name} must specify a versioning behavior using " - "the `versioning_behavior` argument to `@workflow.defn`." + "the `versioning_behavior` argument to `@workflow.defn` or by " + "defining a function decorated with `@workflow.dynamic_config`." ) # Prepare the workflow with the runner (this will error in the diff --git a/temporalio/worker/_workflow_instance.py b/temporalio/worker/_workflow_instance.py index 877737355..4c9e69dd3 100644 --- a/temporalio/worker/_workflow_instance.py +++ b/temporalio/worker/_workflow_instance.py @@ -332,6 +332,15 @@ def __init__(self, det: WorkflowInstanceDetails) -> None: # metadata query self._current_details = "" + # The versioning behavior of this workflow, as established by annotation or by the dynamic + # config function. Is only set once upon initialization. + self._versioning_behavior: Optional[temporalio.common.VersioningBehavior] = None + + # Dynamic failure exception types as overridden by the dynamic config function + self._dynamic_failure_exception_types: Optional[ + Sequence[type[BaseException]] + ] = None + def get_thread_id(self) -> Optional[int]: return self._current_thread_id @@ -348,11 +357,7 @@ def activate( temporalio.bridge.proto.workflow_completion.WorkflowActivationCompletion() ) self._current_completion.successful.SetInParent() - self._current_completion.successful.versioning_behavior = ( - self._defn.versioning_behavior.value - if self._defn.versioning_behavior - else temporalio.api.enums.v1.VersioningBehavior.VERSIONING_BEHAVIOR_UNSPECIFIED - ) + self._current_activation_error: Optional[Exception] = None self._deployment_version_for_current_task = ( act.deployment_version_for_current_task @@ -419,6 +424,12 @@ def activate( ) activation_err = None + # Apply versioning behavior if one was established + if self._versioning_behavior: + self._current_completion.successful.versioning_behavior = ( + self._versioning_behavior.value + ) + # If we're deleting, there better be no more tasks. It is important for # the integrity of the system that we check this. If there are tasks # remaining, they and any associated coroutines will get garbage @@ -439,7 +450,6 @@ def activate( ) # Set completion failure self._current_completion.failed.failure.SetInParent() - # TODO: Review - odd that we don't un-set success here? try: self._failure_converter.to_failure( activation_err, @@ -1728,19 +1738,53 @@ def _convert_payloads( def _instantiate_workflow_object(self) -> Any: if not self._workflow_input: raise RuntimeError("Expected workflow input. This is a Python SDK bug.") + if hasattr(self._defn.cls.__init__, "__temporal_workflow_init"): - return self._defn.cls(*self._workflow_input.args) + workflow_instance = self._defn.cls(*self._workflow_input.args) else: - return self._defn.cls() + workflow_instance = self._defn.cls() + + if self._defn.versioning_behavior: + self._versioning_behavior = self._defn.versioning_behavior + # If there's a dynamic config function, call it now after we've instantiated the object + # but before we start executing the workflow + if self._defn.name is None and self._defn.dynamic_config_fn is not None: + dynamic_config = None + try: + with self._as_read_only(): + dynamic_config = self._defn.dynamic_config_fn(workflow_instance) + except Exception as err: + logger.exception( + f"Failed to run dynamic config function in workflow {self._info.workflow_type}" + ) + # Treat as a task failure + self._current_activation_error = err + raise self._current_activation_error + + if dynamic_config: + if dynamic_config.failure_exception_types is not None: + self._dynamic_failure_exception_types = ( + dynamic_config.failure_exception_types + ) + if ( + dynamic_config.versioning_behavior + != temporalio.common.VersioningBehavior.UNSPECIFIED + ): + self._versioning_behavior = dynamic_config.versioning_behavior + + return workflow_instance def _is_workflow_failure_exception(self, err: BaseException) -> bool: # An exception is a failure instead of a task fail if it's already a # failure error or if it is a timeout error or if it is an instance of # any of the failure types in the worker or workflow-level setting + wf_failure_exception_types = self._defn.failure_exception_types + if self._dynamic_failure_exception_types is not None: + wf_failure_exception_types = self._dynamic_failure_exception_types return ( isinstance(err, temporalio.exceptions.FailureError) or isinstance(err, asyncio.TimeoutError) - or any(isinstance(err, typ) for typ in self._defn.failure_exception_types) + or any(isinstance(err, typ) for typ in wf_failure_exception_types) or any( isinstance(err, typ) for typ in self._worker_level_failure_exception_types diff --git a/temporalio/workflow.py b/temporalio/workflow.py index 3d6b55006..1a56be00d 100644 --- a/temporalio/workflow.py +++ b/temporalio/workflow.py @@ -4,6 +4,7 @@ import asyncio import contextvars +import dataclasses import inspect import logging import threading @@ -415,6 +416,61 @@ def decorator( return decorator(fn.__name__, description, fn, bypass_async_check=True) +@dataclass(frozen=True) +class DynamicWorkflowConfig: + """Returned by functions using the :py:func:`dynamic_config` decorator, see it for more.""" + + failure_exception_types: Optional[Sequence[Type[BaseException]]] = None + """The types of exceptions that, if a workflow-thrown exception extends, will cause the + workflow/update to fail instead of suspending the workflow via task failure. These are applied + in addition to ones set on the worker constructor. If ``Exception`` is set, it effectively will + fail a workflow/update in all user exception cases. + + Always overrides the equivalent parameter on :py:func:`defn` if set not-None. + + WARNING: This setting is experimental. + """ + versioning_behavior: temporalio.common.VersioningBehavior = ( + temporalio.common.VersioningBehavior.UNSPECIFIED + ) + """Specifies the versioning behavior to use for this workflow. + + Always overrides the equivalent parameter on :py:func:`defn`. + + WARNING: This setting is experimental. + """ + + +def dynamic_config( + fn: MethodSyncNoParam[SelfType, DynamicWorkflowConfig], +) -> MethodSyncNoParam[SelfType, DynamicWorkflowConfig]: + """Decorator to allow configuring a dynamic workflow's behavior. + + Because dynamic workflows may conceptually represent more than one workflow type, it may be + desirable to have different settings for fields that would normally be passed to + :py:func:`defn`, but vary based on the workflow type name or other information available in + the workflow's context. This function will be called after the workflow's :py:func:`init`, + if it has one, but before the workflow's :py:func:`run` method. + + The method must only take self as a parameter, and any values set in the class it returns will + override those provided to :py:func:`defn`. + + Cannot be specified on non-dynamic workflows. + + Args: + fn: The function to decorate. + """ + if inspect.iscoroutinefunction(fn): + raise ValueError("Workflow dynamic_config method must be synchronous") + params = list(inspect.signature(fn).parameters.values()) + if len(params) != 1: + raise ValueError("Workflow dynamic_config method must only take self parameter") + + # Add marker attribute + setattr(fn, "__temporal_workflow_dynamic_config", True) + return fn + + @dataclass(frozen=True) class Info: """Information about the running workflow. @@ -1449,6 +1505,7 @@ class _Definition: arg_types: Optional[List[Type]] = None ret_type: Optional[Type] = None versioning_behavior: Optional[temporalio.common.VersioningBehavior] = None + dynamic_config_fn: Optional[Callable[..., DynamicWorkflowConfig]] = None @staticmethod def from_class(cls: Type) -> Optional[_Definition]: @@ -1513,6 +1570,7 @@ def _apply_to_class( # Collect run fn and all signal/query/update fns init_fn: Optional[Callable[..., None]] = None run_fn: Optional[Callable[..., Awaitable[Any]]] = None + dynamic_config_fn: Optional[Callable[..., DynamicWorkflowConfig]] = None seen_run_attr = False signals: Dict[Optional[str], _SignalDefinition] = {} queries: Dict[Optional[str], _QueryDefinition] = {} @@ -1560,6 +1618,17 @@ def _apply_to_class( queries[query_defn.name] = query_defn elif name == "__init__" and hasattr(member, "__temporal_workflow_init"): init_fn = member + elif hasattr(member, "__temporal_workflow_dynamic_config"): + if workflow_name: + issues.append( + "@workflow.dynamic_config can only be used in dynamic workflows, but " + f"workflow class {workflow_name} ({cls.__name__}) is not dynamic" + ) + if dynamic_config_fn: + issues.append( + "@workflow.dynamic_config can only be defined once per workflow" + ) + dynamic_config_fn = member elif isinstance(member, UpdateMethodMultiParam): update_defn = member._defn if update_defn.name in updates: @@ -1643,6 +1712,7 @@ def _apply_to_class( sandboxed=sandboxed, failure_exception_types=failure_exception_types, versioning_behavior=versioning_behavior, + dynamic_config_fn=dynamic_config_fn, ) setattr(cls, "__temporal_workflow_definition", defn) setattr(run_fn, "__temporal_workflow_definition", defn) diff --git a/tests/worker/test_worker.py b/tests/worker/test_worker.py index c66f3b145..aaf85cb5a 100644 --- a/tests/worker/test_worker.py +++ b/tests/worker/test_worker.py @@ -43,7 +43,7 @@ WorkerTuner, WorkflowSlotInfo, ) -from temporalio.workflow import VersioningIntent +from temporalio.workflow import DynamicWorkflowConfig, VersioningIntent from tests.helpers import ( assert_eventually, find_free_port, @@ -795,8 +795,24 @@ async def run(self, args: Sequence[RawValue]) -> str: return "dynamic" -async def test_worker_deployment_dynamic_workflow_on_run( - client: Client, env: WorkflowEnvironment +@workflow.defn(dynamic=True, versioning_behavior=VersioningBehavior.PINNED) +class DynamicWorkflowVersioningOnConfigMethod: + @workflow.dynamic_config + def dynamic_config(self) -> DynamicWorkflowConfig: + return DynamicWorkflowConfig( + versioning_behavior=VersioningBehavior.AUTO_UPGRADE + ) + + @workflow.run + async def run(self, args: Sequence[RawValue]) -> str: + return "dynamic" + + +async def _test_worker_deployment_dynamic_workflow( + client: Client, + env: WorkflowEnvironment, + workflow_class, + expected_versioning_behavior: temporalio.api.enums.v1.VersioningBehavior.ValueType, ): if env.supports_time_skipping: pytest.skip("Test Server doesn't support worker deployments") @@ -806,7 +822,7 @@ async def test_worker_deployment_dynamic_workflow_on_run( async with new_worker( client, - DynamicWorkflowVersioningOnDefn, + workflow_class, deployment_config=WorkerDeploymentConfig( version=worker_v1, use_worker_versioning=True, @@ -832,11 +848,33 @@ async def test_worker_deployment_dynamic_workflow_on_run( assert any( event.HasField("workflow_task_completed_event_attributes") and event.workflow_task_completed_event_attributes.versioning_behavior - == temporalio.api.enums.v1.VersioningBehavior.VERSIONING_BEHAVIOR_PINNED + == expected_versioning_behavior for event in history.events ) +async def test_worker_deployment_dynamic_workflow_with_pinned_versioning( + client: Client, env: WorkflowEnvironment +): + await _test_worker_deployment_dynamic_workflow( + client, + env, + DynamicWorkflowVersioningOnDefn, + temporalio.api.enums.v1.VersioningBehavior.VERSIONING_BEHAVIOR_PINNED, + ) + + +async def test_worker_deployment_dynamic_workflow_with_auto_upgrade_versioning( + client: Client, env: WorkflowEnvironment +): + await _test_worker_deployment_dynamic_workflow( + client, + env, + DynamicWorkflowVersioningOnConfigMethod, + temporalio.api.enums.v1.VersioningBehavior.VERSIONING_BEHAVIOR_AUTO_UPGRADE, + ) + + @workflow.defn class NoVersioningAnnotationWorkflow: @workflow.run diff --git a/tests/worker/test_workflow.py b/tests/worker/test_workflow.py index 9490b0d0b..98e2bba27 100644 --- a/tests/worker/test_workflow.py +++ b/tests/worker/test_workflow.py @@ -4878,6 +4878,18 @@ async def run(self, scenario: FailureTypesScenario) -> None: await super().run(scenario) +class FailureTypesConfiguredDynamicConfig(FailureTypesWorkflowBase): + @workflow.dynamic_config + def dynamic_config(self) -> temporalio.workflow.DynamicWorkflowConfig: + return temporalio.workflow.DynamicWorkflowConfig( + failure_exception_types=[Exception] + ) + + @workflow.run + async def run(self, scenario: FailureTypesScenario) -> None: + await super().run(scenario) + + async def test_workflow_failure_types_configured(client: Client): # Asserter for a single scenario async def assert_scenario( @@ -5047,6 +5059,15 @@ async def run_scenario( FailureTypesConfiguredInheritedWorkflow, FailureTypesScenario.CAUSE_NON_DETERMINISM, ) + # When configured at the workflow level dynamically + await run_scenario( + FailureTypesConfiguredDynamicConfig, + FailureTypesScenario.THROW_CUSTOM_EXCEPTION, + ) + await run_scenario( + FailureTypesConfiguredDynamicConfig, + FailureTypesScenario.CAUSE_NON_DETERMINISM, + ) @workflow.defn(failure_exception_types=[Exception]) @@ -7374,3 +7395,29 @@ async def test_expose_root_execution(client: Client, env: WorkflowEnvironment): assert child_wf_info_root is not None assert child_wf_info_root.workflow_id == parent_desc.id assert child_wf_info_root.run_id == parent_desc.run_id + + +@workflow.defn(dynamic=True) +class WorkflowDynamicConfigFnFailure: + @workflow.dynamic_config + def dynamic_config(self) -> temporalio.workflow.DynamicWorkflowConfig: + raise Exception("Dynamic config failure") + + @workflow.run + async def run(self, args: Sequence[RawValue]) -> None: + raise RuntimeError("Should never actually run") + + +async def test_workflow_dynamic_config_failure(client: Client): + async with new_worker(client, WorkflowDynamicConfigFnFailure) as worker: + handle = await client.start_workflow( + "verycooldynamicworkflow", + id=f"dynamic-config-failure-{uuid.uuid4()}", + task_queue=worker.task_queue, + execution_timeout=timedelta(seconds=5), + ) + + # Assert workflow task fails with our expected error message + await assert_task_fail_eventually( + handle, message_contains="Dynamic config failure" + )