Skip to content

Commit 365cead

Browse files
authored
Add deterministic alternatives for asyncio.wait and asyncio.as_completed (#533)
Fixes #429 Fixes #518
1 parent afadc15 commit 365cead

File tree

6 files changed

+423
-26
lines changed

6 files changed

+423
-26
lines changed

README.md

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,8 @@ informal introduction to the features and their implementation.
6161
- [Invoking Child Workflows](#invoking-child-workflows)
6262
- [Timers](#timers)
6363
- [Conditions](#conditions)
64-
- [Asyncio and Cancellation](#asyncio-and-cancellation)
64+
- [Asyncio and Determinism](#asyncio-and-determinism)
65+
- [Asyncio Cancellation](#asyncio-cancellation)
6566
- [Workflow Utilities](#workflow-utilities)
6667
- [Exceptions](#exceptions)
6768
- [External Workflows](#external-workflows)
@@ -550,8 +551,9 @@ Some things to note about the above workflow code:
550551
* This workflow continually updates the queryable current greeting when signalled and can complete with the greeting on
551552
a different signal
552553
* Workflows are always classes and must have a single `@workflow.run` which is an `async def` function
553-
* Workflow code must be deterministic. This means no threading, no randomness, no external calls to processes, no
554-
network IO, and no global state mutation. All code must run in the implicit `asyncio` event loop and be deterministic.
554+
* Workflow code must be deterministic. This means no `set` iteration, threading, no randomness, no external calls to
555+
processes, no network IO, and no global state mutation. All code must run in the implicit `asyncio` event loop and be
556+
deterministic. Also see the [Asyncio and Determinism](#asyncio-and-determinism) section later.
555557
* `@activity.defn` is explained in a later section. For normal simple string concatenation, this would just be done in
556558
the workflow. The activity is for demonstration purposes only.
557559
* `workflow.execute_activity(create_greeting_activity, ...` is actually a typed signature, and MyPy will fail if the
@@ -678,16 +680,26 @@ Some things to note about the above code:
678680
* A `timeout` can optionally be provided which will throw a `asyncio.TimeoutError` if reached (internally backed by
679681
`asyncio.wait_for` which uses a timer)
680682

681-
#### Asyncio and Cancellation
683+
#### Asyncio and Determinism
682684

683-
Workflows are backed by a custom [asyncio](https://docs.python.org/3/library/asyncio.html) event loop. This means many
684-
of the common `asyncio` calls work as normal. Some asyncio features are disabled such as:
685+
Workflows must be deterministic. Workflows are backed by a custom
686+
[asyncio](https://docs.python.org/3/library/asyncio.html) event loop. This means many of the common `asyncio` calls work
687+
as normal. Some asyncio features are disabled such as:
685688

686689
* Thread related calls such as `to_thread()`, `run_coroutine_threadsafe()`, `loop.run_in_executor()`, etc
687690
* Calls that alter the event loop such as `loop.close()`, `loop.stop()`, `loop.run_forever()`,
688691
`loop.set_task_factory()`, etc
689692
* Calls that use anything external such as networking, subprocesses, disk IO, etc
690693

694+
Also, there are some `asyncio` utilities that internally use `set()` which can make them non-deterministic from one
695+
worker to the next. Therefore the following `asyncio` functions have `workflow`-module alternatives that are
696+
deterministic:
697+
698+
* `asyncio.as_completed()` - use `workflow.as_completed()`
699+
* `asyncio.wait()` - use `workflow.wait()`
700+
701+
#### Asyncio Cancellation
702+
691703
Cancellation is done the same way as `asyncio`. Specifically, a task can be requested to be cancelled but does not
692704
necessarily have to respect that cancellation immediately. This also means that `asyncio.shield()` can be used to
693705
protect against cancellation. The following tasks, when cancelled, perform a Temporal cancellation:

temporalio/worker/workflow_sandbox/_restrictions.py

Lines changed: 109 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import operator
1515
import random
1616
import types
17+
import warnings
1718
from copy import copy, deepcopy
1819
from dataclasses import dataclass
1920
from typing import (
@@ -49,15 +50,25 @@ class RestrictedWorkflowAccessError(temporalio.workflow.NondeterminismError):
4950
qualified_name: Fully qualified name of what was accessed.
5051
"""
5152

52-
def __init__(self, qualified_name: str) -> None:
53+
def __init__(
54+
self, qualified_name: str, *, override_message: Optional[str] = None
55+
) -> None:
5356
"""Create restricted workflow access error."""
5457
super().__init__(
58+
override_message
59+
or RestrictedWorkflowAccessError.default_message(qualified_name)
60+
)
61+
self.qualified_name = qualified_name
62+
63+
@staticmethod
64+
def default_message(qualified_name: str) -> str:
65+
"""Get default message for restricted access."""
66+
return (
5567
f"Cannot access {qualified_name} from inside a workflow. "
5668
"If this is code from a module not used in a workflow or known to "
5769
"only be used deterministically from a workflow, mark the import "
5870
"as pass through."
5971
)
60-
self.qualified_name = qualified_name
6172

6273

6374
@dataclass(frozen=True)
@@ -182,6 +193,20 @@ def nested_child(path: Sequence[str], child: SandboxMatcher) -> SandboxMatcher:
182193
time.
183194
"""
184195

196+
leaf_message: Optional[str] = None
197+
"""
198+
Override message to use in error/warning. Defaults to a common message.
199+
This is only applicable to leafs, so this must only be set when
200+
``match_self`` is ``True`` and this matcher is on ``children`` of a parent.
201+
"""
202+
203+
leaf_warning: Optional[Type[Warning]] = None
204+
"""
205+
If set, issues a warning instead of raising an error. This is only
206+
applicable to leafs, so this must only be set when ``match_self`` is
207+
``True`` and this matcher is on ``children`` of a parent.
208+
"""
209+
185210
all: ClassVar[SandboxMatcher]
186211
"""Shortcut for an always-matched matcher."""
187212

@@ -197,40 +222,67 @@ def nested_child(path: Sequence[str], child: SandboxMatcher) -> SandboxMatcher:
197222
all_uses_runtime: ClassVar[SandboxMatcher]
198223
"""Shortcut for a matcher that matches any :py:attr:`use` at runtime."""
199224

200-
def match_access(
225+
def __post_init__(self):
226+
"""Post initialization validations."""
227+
if self.leaf_message and not self.match_self:
228+
raise ValueError("Cannot set leaf_message without match_self")
229+
if self.leaf_warning and not self.match_self:
230+
raise ValueError("Cannot set leaf_warning without match_self")
231+
232+
def access_matcher(
201233
self, context: RestrictionContext, *child_path: str, include_use: bool = False
202-
) -> bool:
203-
"""Perform a match check.
234+
) -> Optional[SandboxMatcher]:
235+
"""Perform a match check and return matcher.
204236
205237
Args:
206238
context: Current restriction context.
207239
child_path: Full path to the child being accessed.
208240
include_use: Whether to include the :py:attr:`use` set in the check.
209241
210242
Returns:
211-
``True`` if matched.
243+
The matcher if matched.
212244
"""
213245
# We prefer to avoid recursion
214246
matcher = self
215247
for v in child_path:
216248
# Does not match if this is runtime only and we're not runtime
217249
if not context.is_runtime and matcher.only_runtime:
218-
return False
250+
return None
219251

220252
# Considered matched if self matches or access matches. Note, "use"
221253
# does not match by default because we allow it to be accessed but
222254
# not used.
223255
if matcher.match_self or v in matcher.access or "*" in matcher.access:
224-
return True
256+
return matcher
225257
if include_use and (v in matcher.use or "*" in matcher.use):
226-
return True
258+
return matcher
227259
child_matcher = matcher.children.get(v) or matcher.children.get("*")
228260
if not child_matcher:
229-
return False
261+
return None
230262
matcher = child_matcher
231263
if not context.is_runtime and matcher.only_runtime:
232-
return False
233-
return matcher.match_self
264+
return None
265+
if not matcher.match_self:
266+
return None
267+
return matcher
268+
269+
def match_access(
270+
self, context: RestrictionContext, *child_path: str, include_use: bool = False
271+
) -> bool:
272+
"""Perform a match check.
273+
274+
Args:
275+
context: Current restriction context.
276+
child_path: Full path to the child being accessed.
277+
include_use: Whether to include the :py:attr:`use` set in the check.
278+
279+
Returns:
280+
``True`` if matched.
281+
"""
282+
return (
283+
self.access_matcher(context, *child_path, include_use=include_use)
284+
is not None
285+
)
234286

235287
def child_matcher(self, *child_path: str) -> Optional[SandboxMatcher]:
236288
"""Return a child matcher for the given path.
@@ -273,6 +325,10 @@ def __or__(self, other: SandboxMatcher) -> SandboxMatcher:
273325
"""Combine this matcher with another."""
274326
if self.only_runtime != other.only_runtime:
275327
raise ValueError("Cannot combine only-runtime and non-only-runtime")
328+
if self.leaf_message != other.leaf_message:
329+
raise ValueError("Cannot combine different messages")
330+
if self.leaf_warning != other.leaf_warning:
331+
raise ValueError("Cannot combine different warning values")
276332
if self.match_self or other.match_self:
277333
return SandboxMatcher.all
278334
new_children = dict(self.children) if self.children else {}
@@ -287,6 +343,8 @@ def __or__(self, other: SandboxMatcher) -> SandboxMatcher:
287343
use=self.use | other.use,
288344
children=new_children,
289345
only_runtime=self.only_runtime,
346+
leaf_message=self.leaf_message,
347+
leaf_warning=self.leaf_warning,
290348
)
291349

292350
def with_child_unrestricted(self, *child_path: str) -> SandboxMatcher:
@@ -457,6 +515,28 @@ def _public_callables(parent: Any, *, exclude: Set[str] = set()) -> Set[str]:
457515
# rewriter
458516
only_runtime=True,
459517
),
518+
"asyncio": SandboxMatcher(
519+
children={
520+
"as_completed": SandboxMatcher(
521+
children={
522+
"__call__": SandboxMatcher(
523+
match_self=True,
524+
leaf_warning=UserWarning,
525+
leaf_message="asyncio.as_completed() is non-deterministic, use workflow.as_completed() instead",
526+
)
527+
},
528+
),
529+
"wait": SandboxMatcher(
530+
children={
531+
"__call__": SandboxMatcher(
532+
match_self=True,
533+
leaf_warning=UserWarning,
534+
leaf_message="asyncio.wait() is non-deterministic, use workflow.wait() instead",
535+
)
536+
},
537+
),
538+
}
539+
),
460540
# TODO(cretz): Fix issues with class extensions on restricted proxy
461541
# "argparse": SandboxMatcher.all_uses_runtime,
462542
"bz2": SandboxMatcher(use={"open"}),
@@ -689,12 +769,23 @@ def from_proxy(v: _RestrictedProxy) -> _RestrictionState:
689769
matcher: SandboxMatcher
690770

691771
def assert_child_not_restricted(self, name: str) -> None:
692-
if (
693-
self.matcher.match_access(self.context, name)
694-
and not temporalio.workflow.unsafe.is_sandbox_unrestricted()
695-
):
696-
logger.warning("%s on %s restricted", name, self.name)
697-
raise RestrictedWorkflowAccessError(f"{self.name}.{name}")
772+
if temporalio.workflow.unsafe.is_sandbox_unrestricted():
773+
return
774+
matcher = self.matcher.access_matcher(self.context, name)
775+
if not matcher:
776+
return
777+
logger.warning("%s on %s restricted", name, self.name)
778+
# Issue warning instead of error if configured to do so
779+
if matcher.leaf_warning:
780+
warnings.warn(
781+
matcher.leaf_message
782+
or RestrictedWorkflowAccessError.default_message(f"{self.name}.{name}"),
783+
matcher.leaf_warning,
784+
)
785+
else:
786+
raise RestrictedWorkflowAccessError(
787+
f"{self.name}.{name}", override_message=matcher.leaf_message
788+
)
698789

699790
def set_on_proxy(self, v: _RestrictedProxy) -> None:
700791
# To prevent recursion, must use __setattr__ on object to set the

0 commit comments

Comments
 (0)