Skip to content

Commit 2bc0718

Browse files
committed
Add deterministic alternatives for asyncio.wait and asyncio.as_completed
Fixes temporalio#429 Fixes temporalio#518
1 parent 11a97d1 commit 2bc0718

File tree

3 files changed

+271
-6
lines changed

3 files changed

+271
-6
lines changed

README.md

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,8 @@ informal introduction to the features and their implementation.
6161
- [Invoking Child Workflows](#invoking-child-workflows)
6262
- [Timers](#timers)
6363
- [Conditions](#conditions)
64-
- [Asyncio and Cancellation](#asyncio-and-cancellation)
64+
- [Asyncio and Determinism](#asyncio-and-determinism)
65+
- [Asyncio Cancellation](#asyncio-cancellation)
6566
- [Workflow Utilities](#workflow-utilities)
6667
- [Exceptions](#exceptions)
6768
- [External Workflows](#external-workflows)
@@ -550,8 +551,9 @@ Some things to note about the above workflow code:
550551
* This workflow continually updates the queryable current greeting when signalled and can complete with the greeting on
551552
a different signal
552553
* Workflows are always classes and must have a single `@workflow.run` which is an `async def` function
553-
* Workflow code must be deterministic. This means no threading, no randomness, no external calls to processes, no
554-
network IO, and no global state mutation. All code must run in the implicit `asyncio` event loop and be deterministic.
554+
* Workflow code must be deterministic. This means no `set` iteration, threading, no randomness, no external calls to
555+
processes, no network IO, and no global state mutation. All code must run in the implicit `asyncio` event loop and be
556+
deterministic. Also see the [Asyncio and Determinism](#asyncio-and-determinism) section later.
555557
* `@activity.defn` is explained in a later section. For normal simple string concatenation, this would just be done in
556558
the workflow. The activity is for demonstration purposes only.
557559
* `workflow.execute_activity(create_greeting_activity, ...` is actually a typed signature, and MyPy will fail if the
@@ -678,16 +680,26 @@ Some things to note about the above code:
678680
* A `timeout` can optionally be provided which will throw a `asyncio.TimeoutError` if reached (internally backed by
679681
`asyncio.wait_for` which uses a timer)
680682

681-
#### Asyncio and Cancellation
683+
#### Asyncio and Determinism
682684

683-
Workflows are backed by a custom [asyncio](https://docs.python.org/3/library/asyncio.html) event loop. This means many
684-
of the common `asyncio` calls work as normal. Some asyncio features are disabled such as:
685+
Workflows must be deterministic. Workflows are backed by a custom
686+
[asyncio](https://docs.python.org/3/library/asyncio.html) event loop. This means many of the common `asyncio` calls work
687+
as normal. Some asyncio features are disabled such as:
685688

686689
* Thread related calls such as `to_thread()`, `run_coroutine_threadsafe()`, `loop.run_in_executor()`, etc
687690
* Calls that alter the event loop such as `loop.close()`, `loop.stop()`, `loop.run_forever()`,
688691
`loop.set_task_factory()`, etc
689692
* Calls that use anything external such as networking, subprocesses, disk IO, etc
690693

694+
Also, there are some `asyncio` utilities that internally use `set()` which can make them non-deterministic from one
695+
worker to the next. Therefore the following `asyncio` functions have `workflow`-module alternatives that are
696+
deterministic:
697+
698+
* `asyncio.as_completed()` - use `workflow.as_completed()`
699+
* `asyncio.wait()` - use `workflow.wait()`
700+
701+
#### Asyncio Cancellation
702+
691703
Cancellation is done the same way as `asyncio`. Specifically, a task can be requested to be cancelled but does not
692704
necessarily have to respect that cancellation immediately. This also means that `asyncio.shield()` can be used to
693705
protect against cancellation. The following tasks, when cancelled, perform a Temporal cancellation:

temporalio/workflow.py

Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
Callable,
2323
Dict,
2424
Generic,
25+
Iterable,
2526
Iterator,
2627
List,
2728
Mapping,
@@ -31,6 +32,7 @@
3132
Sequence,
3233
Tuple,
3334
Type,
35+
TypeVar,
3436
Union,
3537
cast,
3638
overload,
@@ -4361,6 +4363,176 @@ def set_dynamic_update_handler(
43614363
_Runtime.current().workflow_set_update_handler(None, handler, validator)
43624364

43634365

4366+
def as_completed(
4367+
fs: Iterable[Awaitable[AnyType]], *, timeout: Optional[float] = None
4368+
) -> Iterator[Awaitable[AnyType]]:
4369+
"""Return an iterator whose values are coroutines.
4370+
4371+
This is a deterministic version of :py:func:`asyncio.as_completed`. This
4372+
function should be used instead of that one in workflows.
4373+
"""
4374+
# Taken almost verbatim from
4375+
# https://github.com/python/cpython/blob/v3.12.3/Lib/asyncio/tasks.py#L584
4376+
# but the "set" is changed out for a "list" and fixed up some typing/format
4377+
4378+
if asyncio.isfuture(fs) or asyncio.iscoroutine(fs):
4379+
raise TypeError(f"expect an iterable of futures, not {type(fs).__name__}")
4380+
4381+
done = asyncio.Queue[Optional[asyncio.Future]]()
4382+
4383+
loop = asyncio.get_event_loop()
4384+
todo: List[asyncio.Future] = [asyncio.ensure_future(f, loop=loop) for f in list(fs)]
4385+
timeout_handle = None
4386+
4387+
def _on_timeout():
4388+
for f in todo:
4389+
f.remove_done_callback(_on_completion)
4390+
done.put_nowait(None) # Queue a dummy value for _wait_for_one().
4391+
todo.clear() # Can't do todo.remove(f) in the loop.
4392+
4393+
def _on_completion(f):
4394+
if not todo:
4395+
return # _on_timeout() was here first.
4396+
todo.remove(f)
4397+
done.put_nowait(f)
4398+
if not todo and timeout_handle is not None:
4399+
timeout_handle.cancel()
4400+
4401+
async def _wait_for_one():
4402+
f = await done.get()
4403+
if f is None:
4404+
# Dummy value from _on_timeout().
4405+
raise asyncio.TimeoutError
4406+
return f.result() # May raise f.exception().
4407+
4408+
for f in todo:
4409+
f.add_done_callback(_on_completion)
4410+
if todo and timeout is not None:
4411+
timeout_handle = loop.call_later(timeout, _on_timeout)
4412+
for _ in range(len(todo)):
4413+
yield _wait_for_one()
4414+
4415+
4416+
if TYPE_CHECKING:
4417+
_FT = TypeVar("_FT", bound=asyncio.Future[Any])
4418+
else:
4419+
_FT = TypeVar("_FT", bound=asyncio.Future)
4420+
4421+
4422+
@overload
4423+
async def wait( # type: ignore[misc]
4424+
fs: Iterable[_FT],
4425+
*,
4426+
timeout: Optional[float] = None,
4427+
return_when: str = asyncio.ALL_COMPLETED,
4428+
) -> Tuple[List[_FT], List[_FT]]:
4429+
...
4430+
4431+
4432+
@overload
4433+
async def wait(
4434+
fs: Iterable[asyncio.Task[AnyType]],
4435+
*,
4436+
timeout: Optional[float] = None,
4437+
return_when: str = asyncio.ALL_COMPLETED,
4438+
) -> Tuple[List[asyncio.Task[AnyType]], set[asyncio.Task[AnyType]]]:
4439+
...
4440+
4441+
4442+
async def wait(
4443+
fs: Iterable,
4444+
*,
4445+
timeout: Optional[float] = None,
4446+
return_when: str = asyncio.ALL_COMPLETED,
4447+
) -> Tuple:
4448+
"""Wait for the Futures or Tasks given by fs to complete.
4449+
4450+
This is a deterministic version of :py:func:`asyncio.wait`. This function
4451+
should be used instead of that one in workflows.
4452+
"""
4453+
# Taken almost verbatim from
4454+
# https://github.com/python/cpython/blob/v3.12.3/Lib/asyncio/tasks.py#L435
4455+
# but the "set" is changed out for a "list" and fixed up some typing/format
4456+
4457+
if asyncio.isfuture(fs) or asyncio.iscoroutine(fs):
4458+
raise TypeError(f"expect a list of futures, not {type(fs).__name__}")
4459+
if not fs:
4460+
raise ValueError("Set of Tasks/Futures is empty.")
4461+
if return_when not in (
4462+
asyncio.FIRST_COMPLETED,
4463+
asyncio.FIRST_EXCEPTION,
4464+
asyncio.ALL_COMPLETED,
4465+
):
4466+
raise ValueError(f"Invalid return_when value: {return_when}")
4467+
4468+
fs = list(fs)
4469+
4470+
if any(asyncio.iscoroutine(f) for f in fs):
4471+
raise TypeError("Passing coroutines is forbidden, use tasks explicitly.")
4472+
4473+
loop = asyncio.get_running_loop()
4474+
return await _wait(fs, timeout, return_when, loop)
4475+
4476+
4477+
async def _wait(
4478+
fs: Iterable[Union[asyncio.Future, asyncio.Task]],
4479+
timeout: Optional[float],
4480+
return_when: str,
4481+
loop: asyncio.AbstractEventLoop,
4482+
) -> Tuple[List, List]:
4483+
# Taken almost verbatim from
4484+
# https://github.com/python/cpython/blob/v3.12.3/Lib/asyncio/tasks.py#L522
4485+
# but the "set" is changed out for a "list" and fixed up some typing/format
4486+
4487+
assert fs, "Set of Futures is empty."
4488+
waiter = loop.create_future()
4489+
timeout_handle = None
4490+
if timeout is not None:
4491+
timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
4492+
counter = len(fs) # type: ignore[arg-type]
4493+
4494+
def _on_completion(f):
4495+
nonlocal counter
4496+
counter -= 1
4497+
if (
4498+
counter <= 0
4499+
or return_when == asyncio.FIRST_COMPLETED
4500+
or return_when == asyncio.FIRST_EXCEPTION
4501+
and (not f.cancelled() and f.exception() is not None)
4502+
):
4503+
if timeout_handle is not None:
4504+
timeout_handle.cancel()
4505+
if not waiter.done():
4506+
waiter.set_result(None)
4507+
4508+
for f in fs:
4509+
f.add_done_callback(_on_completion)
4510+
4511+
try:
4512+
await waiter
4513+
finally:
4514+
if timeout_handle is not None:
4515+
timeout_handle.cancel()
4516+
for f in fs:
4517+
f.remove_done_callback(_on_completion)
4518+
4519+
done, pending = [], []
4520+
for f in fs:
4521+
if f.done():
4522+
done.append(f)
4523+
else:
4524+
pending.append(f)
4525+
return done, pending
4526+
4527+
4528+
def _release_waiter(waiter: asyncio.Future[Any], *args) -> None:
4529+
# Taken almost verbatim from
4530+
# https://github.com/python/cpython/blob/v3.12.3/Lib/asyncio/tasks.py#L467
4531+
4532+
if not waiter.done():
4533+
waiter.set_result(None)
4534+
4535+
43644536
def _is_unbound_method_on_cls(fn: Callable[..., Any], cls: Type) -> bool:
43654537
# Python 3 does not make this easy, ref https://stackoverflow.com/questions/3589311
43664538
return (

tests/worker/test_workflow.py

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4759,3 +4759,84 @@ async def any_task_completed(handle: WorkflowHandle) -> bool:
47594759
# Terminate both
47604760
await handle1.terminate()
47614761
await handle2.terminate()
4762+
4763+
4764+
@activity.defn(dynamic=True)
4765+
async def return_name_activity(args: Sequence[RawValue]) -> str:
4766+
return activity.info().activity_type
4767+
4768+
4769+
@workflow.defn
4770+
class AsCompletedWorkflow:
4771+
@workflow.run
4772+
async def run(self) -> List[str]:
4773+
# Lazily start 10 different activities and wait for each completed
4774+
tasks = [
4775+
workflow.execute_activity(
4776+
f"my-activity-{i}", start_to_close_timeout=timedelta(seconds=1)
4777+
)
4778+
for i in range(10)
4779+
]
4780+
4781+
# Using asyncio.as_completed like below almost always fails with
4782+
# non-determinism error because it uses sets internally, but we can't
4783+
# assert on that because it could _technically_ pass though unlikely:
4784+
# return [await task for task in asyncio.as_completed(tasks)]
4785+
4786+
return [await task for task in workflow.as_completed(tasks)]
4787+
4788+
4789+
async def test_workflow_as_completed_utility(client: Client):
4790+
# Disable cache to force replay
4791+
async with new_worker(
4792+
client,
4793+
AsCompletedWorkflow,
4794+
activities=[return_name_activity],
4795+
max_cached_workflows=0,
4796+
) as worker:
4797+
# This would fail if we used asyncio.as_completed in the workflow
4798+
result = await client.execute_workflow(
4799+
AsCompletedWorkflow.run,
4800+
id=f"wf-{uuid.uuid4()}",
4801+
task_queue=worker.task_queue,
4802+
)
4803+
assert len(result) == 10
4804+
4805+
4806+
@workflow.defn
4807+
class WaitWorkflow:
4808+
@workflow.run
4809+
async def run(self) -> List[str]:
4810+
# Create 10 tasks that return activity names, wait on them, then execute
4811+
# the activities
4812+
async def new_activity_name(index: int) -> str:
4813+
return f"my-activity-{index}"
4814+
4815+
name_tasks = [asyncio.create_task(new_activity_name(i)) for i in range(10)]
4816+
4817+
# Using asyncio.wait like below almost always fails with non-determinism
4818+
# error because it returns sets, but we can't assert on that because it
4819+
# could _technically_ pass though unlikely:
4820+
# done, _ = await asyncio.wait(name_tasks)
4821+
4822+
done, _ = await workflow.wait(name_tasks)
4823+
return [
4824+
await workflow.execute_activity(
4825+
await activity_name, start_to_close_timeout=timedelta(seconds=1)
4826+
)
4827+
for activity_name in done
4828+
]
4829+
4830+
4831+
async def test_workflow_wait_utility(client: Client):
4832+
# Disable cache to force replay
4833+
async with new_worker(
4834+
client, WaitWorkflow, activities=[return_name_activity], max_cached_workflows=0
4835+
) as worker:
4836+
# This would fail if we used asyncio.wait in the workflow
4837+
result = await client.execute_workflow(
4838+
WaitWorkflow.run,
4839+
id=f"wf-{uuid.uuid4()}",
4840+
task_queue=worker.task_queue,
4841+
)
4842+
assert len(result) == 10

0 commit comments

Comments
 (0)