Skip to content

Commit 83bbc36

Browse files
authored
Dynamic workflows, activities, signals, and queries (#346)
Fixes #242
1 parent 34681ca commit 83bbc36

File tree

16 files changed

+748
-171
lines changed

16 files changed

+748
-171
lines changed

.github/workflows/ci.yml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,12 @@ jobs:
2424
- os: ubuntu-arm
2525
runsOn: buildjet-4vcpu-ubuntu-2204-arm
2626
runs-on: ${{ matrix.runsOn || matrix.os }}
27+
# For Windows there is currently a bug with Windows + pytest + warnings +
28+
# importlib + Python < 3.12. Based on others' investigations, disabling
29+
# bytecode fixes it.
30+
# See https://github.com/temporalio/sdk-python/pull/346#issuecomment-1636108747
31+
env:
32+
PYTHONDONTWRITEBYTECODE: "${{ matrix.os == 'windows-latest' && '1' || '' }}"
2733
steps:
2834
- uses: actions/checkout@v2
2935
with:

README.md

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -539,6 +539,9 @@ Here are the decorators that can be applied:
539539
* `@workflow.defn` - Defines a workflow class
540540
* Must be defined on the class given to the worker (ignored if present on a base class)
541541
* Can have a `name` param to customize the workflow name, otherwise it defaults to the unqualified class name
542+
* Can have `dynamic=True` which means all otherwise unhandled workflows fall through to this. If present, cannot have
543+
`name` argument, and run method must accept a single parameter of `Sequence[temporalio.common.RawValue]` type. The
544+
payload of the raw value can be converted via `workflow.payload_converter().from_payload`.
542545
* `@workflow.run` - Defines the primary workflow run method
543546
* Must be defined on the same class as `@workflow.defn`, not a base class (but can _also_ be defined on the same
544547
method of a base class)
@@ -553,7 +556,8 @@ Here are the decorators that can be applied:
553556
* The method's arguments are the signal's arguments
554557
* Can have a `name` param to customize the signal name, otherwise it defaults to the unqualified method name
555558
* Can have `dynamic=True` which means all otherwise unhandled signals fall through to this. If present, cannot have
556-
`name` argument, and method parameters must be `self`, a string signal name, and a `*args` varargs param.
559+
`name` argument, and method parameters must be `self`, a string signal name, and a
560+
`Sequence[temporalio.common.RawValue]`.
557561
* Non-dynamic method can only have positional arguments. Best practice is to only take a single argument that is an
558562
object/dataclass of fields that can be added to as needed.
559563
* Return value is ignored
@@ -1051,6 +1055,10 @@ Some things to note about activity definitions:
10511055
activity may need (e.g. a DB connection). The instance method should be what is registered with the worker.
10521056
* Activities can also be defined on callable classes (i.e. classes with `__call__`). An instance of the class should be
10531057
what is registered with the worker.
1058+
* The `@activity.defn` can have `dynamic=True` set which means all otherwise unhandled activities fall through to this.
1059+
If present, cannot have `name` argument, and the activity function must accept a single parameter of
1060+
`Sequence[temporalio.common.RawValue]`. The payload of the raw value can be converted via
1061+
`activity.payload_converter().from_payload`.
10541062

10551063
#### Types of Activities
10561064

temporalio/activity.py

Lines changed: 55 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
)
3636

3737
import temporalio.common
38+
import temporalio.converter
3839

3940
from .types import CallableType
4041

@@ -51,11 +52,19 @@ def defn(
5152
...
5253

5354

55+
@overload
56+
def defn(
57+
*, no_thread_cancel_exception: bool = False, dynamic: bool = False
58+
) -> Callable[[CallableType], CallableType]:
59+
...
60+
61+
5462
def defn(
5563
fn: Optional[CallableType] = None,
5664
*,
5765
name: Optional[str] = None,
5866
no_thread_cancel_exception: bool = False,
67+
dynamic: bool = False,
5968
):
6069
"""Decorator for activity functions.
6170
@@ -64,15 +73,19 @@ def defn(
6473
Args:
6574
fn: The function to decorate.
6675
name: Name to use for the activity. Defaults to function ``__name__``.
76+
This cannot be set if dynamic is set.
6777
no_thread_cancel_exception: If set to true, an exception will not be
6878
raised in synchronous, threaded activities upon cancellation.
79+
dynamic: If true, this activity will be dynamic. Dynamic activities have
80+
to accept a single 'Sequence[RawValue]' parameter. This cannot be
81+
set to true if name is present.
6982
"""
7083

7184
def decorator(fn: CallableType) -> CallableType:
7285
# This performs validation
7386
_Definition._apply_to_callable(
7487
fn,
75-
activity_name=name or fn.__name__,
88+
activity_name=name or fn.__name__ if not dynamic else None,
7689
no_thread_cancel_exception=no_thread_cancel_exception,
7790
)
7891
return fn
@@ -132,7 +145,12 @@ class _Context:
132145
cancelled_event: _CompositeEvent
133146
worker_shutdown_event: _CompositeEvent
134147
shield_thread_cancel_exception: Optional[Callable[[], AbstractContextManager]]
148+
payload_converter_class_or_instance: Union[
149+
Type[temporalio.converter.PayloadConverter],
150+
temporalio.converter.PayloadConverter,
151+
]
135152
_logger_details: Optional[Mapping[str, Any]] = None
153+
_payload_converter: Optional[temporalio.converter.PayloadConverter] = None
136154

137155
@staticmethod
138156
def current() -> _Context:
@@ -155,6 +173,18 @@ def logger_details(self) -> Mapping[str, Any]:
155173
self._logger_details = self.info()._logger_details()
156174
return self._logger_details
157175

176+
@property
177+
def payload_converter(self) -> temporalio.converter.PayloadConverter:
178+
if not self._payload_converter:
179+
if isinstance(
180+
self.payload_converter_class_or_instance,
181+
temporalio.converter.PayloadConverter,
182+
):
183+
self._payload_converter = self.payload_converter_class_or_instance
184+
else:
185+
self._payload_converter = self.payload_converter_class_or_instance()
186+
return self._payload_converter
187+
158188

159189
@dataclass
160190
class _CompositeEvent:
@@ -339,6 +369,14 @@ class _CompleteAsyncError(BaseException):
339369
pass
340370

341371

372+
def payload_converter() -> temporalio.converter.PayloadConverter:
373+
"""Get the payload converter for the current activity.
374+
375+
This is often used for dynamic activities to convert payloads.
376+
"""
377+
return _Context.current().payload_converter
378+
379+
342380
class LoggerAdapter(logging.LoggerAdapter):
343381
"""Adapter that adds details to the log about the running activity.
344382
@@ -387,9 +425,9 @@ def base_logger(self) -> logging.Logger:
387425
"""Logger that will have contextual activity details embedded."""
388426

389427

390-
@dataclass
428+
@dataclass(frozen=True)
391429
class _Definition:
392-
name: str
430+
name: Optional[str]
393431
fn: Callable
394432
is_async: bool
395433
no_thread_cancel_exception: bool
@@ -420,7 +458,10 @@ def must_from_callable(fn: Callable) -> _Definition:
420458

421459
@staticmethod
422460
def _apply_to_callable(
423-
fn: Callable, *, activity_name: str, no_thread_cancel_exception: bool = False
461+
fn: Callable,
462+
*,
463+
activity_name: Optional[str],
464+
no_thread_cancel_exception: bool = False,
424465
) -> None:
425466
# Validate the activity
426467
if hasattr(fn, "__temporal_activity_definition"):
@@ -447,6 +488,16 @@ def _apply_to_callable(
447488

448489
def __post_init__(self) -> None:
449490
if self.arg_types is None and self.ret_type is None:
491+
dynamic = self.name is None
450492
arg_types, ret_type = temporalio.common._type_hints_from_func(self.fn)
493+
# If dynamic, must be a sequence of raw values
494+
if dynamic and (
495+
not arg_types
496+
or len(arg_types) != 1
497+
or arg_types[0] != Sequence[temporalio.common.RawValue]
498+
):
499+
raise TypeError(
500+
"Dynamic activity must accept a single Sequence[temporalio.common.RawValue]"
501+
)
451502
object.__setattr__(self, "arg_types", arg_types)
452503
object.__setattr__(self, "ret_type", ret_type)

temporalio/client.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -408,6 +408,8 @@ async def start_workflow(
408408
name = workflow
409409
elif callable(workflow):
410410
defn = temporalio.workflow._Definition.must_from_run_fn(workflow)
411+
if not defn.name:
412+
raise ValueError("Cannot invoke dynamic workflow explicitly")
411413
name = defn.name
412414
if result_type is None:
413415
result_type = defn.ret_type
@@ -2827,6 +2829,8 @@ def __init__(
28272829
# Use definition if callable
28282830
if callable(workflow):
28292831
defn = temporalio.workflow._Definition.must_from_run_fn(workflow)
2832+
if not defn.name:
2833+
raise ValueError("Cannot schedule dynamic workflow explicitly")
28302834
workflow = defn.name
28312835
elif not isinstance(workflow, str):
28322836
raise TypeError("Workflow must be a string or callable")

temporalio/common.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,34 @@ class QueryRejectCondition(IntEnum):
138138
)
139139

140140

141+
@dataclass(frozen=True)
142+
class RawValue:
143+
"""Representation of an unconverted, raw payload.
144+
145+
This type can be used as a parameter or return type in workflows,
146+
activities, signals, and queries to pass through a raw payload.
147+
Encoding/decoding of the payload is still done by the system.
148+
"""
149+
150+
payload: temporalio.api.common.v1.Payload
151+
152+
def __getstate__(self) -> object:
153+
"""Pickle support."""
154+
# We'll convert payload to bytes and prepend a version number just in
155+
# case we want to extend in the future
156+
return b"1" + self.payload.SerializeToString()
157+
158+
def __setstate__(self, state: object) -> None:
159+
"""Pickle support."""
160+
if not isinstance(state, bytes):
161+
raise TypeError(f"Expected bytes state, got {type(state)}")
162+
if not state[:1] == b"1":
163+
raise ValueError("Bad version prefix")
164+
object.__setattr__(
165+
self, "payload", temporalio.api.common.v1.Payload.FromString(state[1:])
166+
)
167+
168+
141169
# We choose to make this a list instead of an sequence so we can catch if people
142170
# are not sending lists each time but maybe accidentally sending a string (which
143171
# is a sequence)

temporalio/converter.py

Lines changed: 68 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
TypeVar,
3232
Union,
3333
get_type_hints,
34+
overload,
3435
)
3536

3637
import google.protobuf.json_format
@@ -43,6 +44,7 @@
4344
import temporalio.api.failure.v1
4445
import temporalio.common
4546
import temporalio.exceptions
47+
import temporalio.types
4648

4749
if sys.version_info < (3, 11):
4850
# Python's datetime.fromisoformat doesn't support certain formats pre-3.11
@@ -67,6 +69,9 @@ def to_payloads(
6769
) -> List[temporalio.api.common.v1.Payload]:
6870
"""Encode values into payloads.
6971
72+
Implementers are expected to just return the payload for
73+
:py:class:`temporalio.common.RawValue`.
74+
7075
Args:
7176
values: Values to be converted.
7277
@@ -88,6 +93,9 @@ def from_payloads(
8893
) -> List[Any]:
8994
"""Decode payloads into values.
9095
96+
Implementers are expected to treat a type hint of
97+
:py:class:`temporalio.common.RawValue` as just the raw value.
98+
9199
Args:
92100
payloads: Payloads to convert to Python values.
93101
type_hints: Types that are expected if any. This may not have any
@@ -122,6 +130,51 @@ def from_payloads_wrapper(
122130
return []
123131
return self.from_payloads(payloads.payloads)
124132

133+
def to_payload(self, value: Any) -> temporalio.api.common.v1.Payload:
134+
"""Convert a single value to a payload.
135+
136+
This is a shortcut for :py:meth:`to_payloads` with a single-item list
137+
and result.
138+
139+
Args:
140+
value: Value to convert to a single payload.
141+
142+
Returns:
143+
Single converted payload.
144+
"""
145+
return self.to_payloads([value])[0]
146+
147+
@overload
148+
def from_payload(self, payload: temporalio.api.common.v1.Payload) -> Any:
149+
...
150+
151+
@overload
152+
def from_payload(
153+
self,
154+
payload: temporalio.api.common.v1.Payload,
155+
type_hint: Type[temporalio.types.AnyType],
156+
) -> temporalio.types.AnyType:
157+
...
158+
159+
def from_payload(
160+
self,
161+
payload: temporalio.api.common.v1.Payload,
162+
type_hint: Optional[Type] = None,
163+
) -> Any:
164+
"""Convert a single payload to a value.
165+
166+
This is a shortcut for :py:meth:`from_payloads` with a single-item list
167+
and result.
168+
169+
Args:
170+
payload: Payload to convert to value.
171+
type_hint: Optional type hint to say which type to convert to.
172+
173+
Returns:
174+
Single converted value.
175+
"""
176+
return self.from_payloads([payload], [type_hint] if type_hint else None)[0]
177+
125178

126179
class EncodingPayloadConverter(ABC):
127180
"""Base converter to/from single payload/value with a known encoding for use in CompositePayloadConverter."""
@@ -209,10 +262,14 @@ def to_payloads(
209262
# We intentionally attempt these serially just in case a stateful
210263
# converter may rely on the previous values
211264
payload = None
212-
for converter in self.converters.values():
213-
payload = converter.to_payload(value)
214-
if payload is not None:
215-
break
265+
# RawValue should just pass through
266+
if isinstance(value, temporalio.common.RawValue):
267+
payload = value.payload
268+
else:
269+
for converter in self.converters.values():
270+
payload = converter.to_payload(value)
271+
if payload is not None:
272+
break
216273
if payload is None:
217274
raise RuntimeError(
218275
f"Value at index {index} of type {type(value)} has no known converter"
@@ -235,13 +292,17 @@ def from_payloads(
235292
"""
236293
values = []
237294
for index, payload in enumerate(payloads):
295+
type_hint = None
296+
if type_hints and len(type_hints) > index:
297+
type_hint = type_hints[index]
298+
# Raw value should just wrap
299+
if type_hint == temporalio.common.RawValue:
300+
values.append(temporalio.common.RawValue(payload))
301+
continue
238302
encoding = payload.metadata.get("encoding", b"<unknown>")
239303
converter = self.converters.get(encoding)
240304
if converter is None:
241305
raise KeyError(f"Unknown payload encoding {encoding.decode()}")
242-
type_hint = None
243-
if type_hints is not None:
244-
type_hint = type_hints[index]
245306
try:
246307
values.append(converter.from_payload(payload, type_hint))
247308
except RuntimeError as err:

temporalio/testing/_activity.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from typing_extensions import ParamSpec
1313

1414
import temporalio.activity
15+
import temporalio.converter
1516
import temporalio.exceptions
1617
import temporalio.worker._activity
1718

@@ -52,12 +53,18 @@ class ActivityEnvironment:
5253
function.
5354
on_heartbeat: Function called on each heartbeat invocation by the
5455
activity.
56+
payload_converter: Payload converter set on the activity context. This
57+
must be set before :py:meth:`run`. Changes after the activity has
58+
started do not take effect.
5559
"""
5660

5761
def __init__(self) -> None:
5862
"""Create an ActivityEnvironment for running activity code."""
5963
self.info = _default_info
6064
self.on_heartbeat: Callable[..., None] = lambda *args: None
65+
self.payload_converter = (
66+
temporalio.converter.DataConverter.default.payload_converter
67+
)
6168
self._cancelled = False
6269
self._worker_shutdown = False
6370
self._activities: Set[_Activity] = set()
@@ -139,6 +146,7 @@ def __init__(
139146
shield_thread_cancel_exception=None
140147
if not self.cancel_thread_raiser
141148
else self.cancel_thread_raiser.shielded,
149+
payload_converter_class_or_instance=env.payload_converter,
142150
)
143151
self.task: Optional[asyncio.Task] = None
144152

0 commit comments

Comments
 (0)