Skip to content

[Bug] asyncio.wait is non-deterministic when used with coroutines instead of tasks #429

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
ntessman-capsule opened this issue Nov 15, 2023 · 4 comments · Fixed by #533
Closed
Labels
bug Something isn't working

Comments

@ntessman-capsule
Copy link

ntessman-capsule commented Nov 15, 2023

What are you really trying to do?

Split a list of incoming events into concurrently executed activity streams. The repro below mimics a stripped down version of the original code structure.

Describe the bug

When using asyncio.wait() with a list of async method calls, which contain a series of activity executions, an error can occur which results in later activities receiving the return values of different concurrent executions.

Here is a table to illustrate what I mean. Given an activity that executes f(x) -> x, if we pass the numbers 1-3 concurrently, we might see this:

Activity Input Expected Output Actual Output
1 1 1
2 2 3
3 3 2

This issue is random, and rare. The reproduction below is designed to maximize the chance of running into it.

Minimal Reproduction

Below is a self-contained reproduction of the issue. Using this input should encounter the error fairly consistently, a little less than once per run:

{"execution_iterations": 20,"activity_iterations": 5,"concurrency": 100,"wait_time": 0.1}

The temporal setup I used to reproduce this issue is a vanilla Temporal server install running via temporal server start-dev, with two worker instances running the below python file using python -m main. I primarily tested using Python 3.8.

When there is a mismatch, it is printed to the console.

import asyncio
from datetime import timedelta
from typing import List
from temporalio import workflow, activity
from temporalio.worker import Worker
from temporalio.client import Client

from dataclasses import dataclass

@dataclass
class EchoRequest:
    execution_iteration: int
    activity_iteration: int
    input: str
    wait_time: float

@dataclass
class EchoResponse:
    output: str

@activity.defn(name="echo")
async def echo(
    echo: EchoRequest
) -> EchoResponse:
    await asyncio.sleep(echo.wait_time)
    return EchoResponse(output=echo.input)


@dataclass
class RaceConditionIteration:
    execution_id: int
    iterations: int
    wait_time: float


@dataclass
class RaceConditionTestInput:
    execution_iterations: int
    activity_iterations: int
    concurrency: int
    wait_time: float


@activity.defn(name="race_condition_get_input_events")
async def race_condition_get_input_events(
    input: RaceConditionTestInput
) -> List[RaceConditionIteration]:
    return [
        RaceConditionIteration(
            execution_id=id,
            iterations=input.activity_iterations,
            wait_time=input.wait_time,
        )
        for id in range(input.concurrency)
    ]


@workflow.defn(name="RaceConditionTestWorkflow")
class RaceConditionTestWorkflow:
    @workflow.run
    async def run(self, event: RaceConditionTestInput) -> str:
        print('Starting workflow...')
        
        # Simulate a list of input events
        generated_input_data = await workflow.execute_activity(
            race_condition_get_input_events,
            event,
            start_to_close_timeout=timedelta(seconds=10),
            schedule_to_close_timeout=timedelta(seconds=60)
        )

        for iteration in range(event.execution_iterations):
            _, _ = await asyncio.wait(
                [
                    self.execute_activities(
                        iteration=iteration,
                        input=input
                    )
                    for input in generated_input_data
                ],
                return_when=asyncio.ALL_COMPLETED
            )
        
        return "Done."

    async def execute_activities(
        self,
        iteration: int,
        input: RaceConditionIteration,
    ) -> None:
        for iter in range(input.iterations):
            iter_id = f"{iteration}.{input.execution_id}.{iter}"
            result = await workflow.execute_activity(
                echo,
                EchoRequest(
                    execution_iteration=iteration,
                    activity_iteration=iter,
                    input=iter_id,
                    wait_time=input.wait_time
                ),
                start_to_close_timeout=timedelta(seconds=10),
                schedule_to_close_timeout=timedelta(seconds=60)
            )

            if iter_id != result.output:
                print(f"Expected: {iter_id}, Actual: {result.output}")
            

async def main():
    client = await Client.connect("localhost:7233")

    worker = Worker(
        client,
        task_queue="race-condition",
        workflows=[RaceConditionTestWorkflow],
        activities=[echo, race_condition_get_input_events],
    )

    print('Starting worker...')

    await worker.run()

if __name__ == '__main__':
    asyncio.run(main())

Environment/Versions

  • OS and processor: Tested on M2 Mac, Linux
  • Temporal SDK Versions: Tested on 1.13, 1.14
  • Python: <3.11 (mitigated in 3.11 due to disallowing coroutines, see below)
  • Occurs within Kubernetes deployment as well as local install

Additional context

It appears that this bug occurs after this warning is printed to the worker console:

2023-11-15T05:38:09.387580Z  WARN temporal_sdk_core::worker::workflow: Task not found when completing error=status: NotFound, message: "Workflow task not found.", details: [8, 5, 18, 24, 87, 111, 114, 107, 102, 108, 111, 119, 32, 116, 97, 115, 107, 32, 110, 111, 116, 32, 102, 111, 117, 110, 100, 46, 26, 66, 10, 64, 116, 121, 112, 101, 46, 103, 111, 111, 103, 108, 101, 97, 112, 105, 115, 46, 99, 111, 109, 47, 116, 101, 109, 112, 111, 114, 97, 108, 46, 97, 112, 105, 46, 101, 114, 114, 111, 114, 100, 101, 116, 97, 105, 108, 115, 46, 118, 49, 46, 78, 111, 116, 70, 111, 117, 110, 100, 70, 97, 105, 108, 117, 114, 101], metadata: MetadataMap { headers: {"content-type": "application/grpc"} } run_id="eb9b8aed-c730-4fe6-a7e5-a772f6757be2"

I can't confirm whether this warning was also appearing when this issue was happening in the real code. It also appears to result in the workflow task restarting, which seems correlated with the determinism breakdown.

Additionally, it appears that this issue has indirectly been mitigated in 3.11, as asyncio.wait() no longer allows passing coroutines directly. When the method is wrapped in asyncio.create_task(), this issue disappears. The above warning is still printed, but doesn't result in disordered activity results.

This code change in the workflow removes the issue:

_, _ = await asyncio.wait(
    [
-      self.execute_activities(
+      asyncio.create_task(self.execute_activities(
            iteration=iteration,
            input=input
-       )
+       ))
        for input in generated_input_data
    ],
    return_when=asyncio.ALL_COMPLETED
)
@ntessman-capsule ntessman-capsule added the bug Something isn't working label Nov 15, 2023
@HillaShx
Copy link

I'm facing this same issue, it seems, by the logs. I, too, use asyncio for concurrent activity execution (by multiple workers separated by threads).

Environment/Versions
OS and processor: M2 Mac, MacOS Ventura 13.5.1
Temporal SDK Versions: 1.3.0
Python: 3.9.6
Occurs within Temporal Cluster service deployment, with the workers deployed to an Azure Web App.
LMK what other context might help in resolving this.

@cretz
Copy link
Member

cretz commented Nov 27, 2023

Hrmm, I was only able to get the mismatch after replay (replay was being forced due to overgrowing history size I think). I am still investigating here...

@cretz
Copy link
Member

cretz commented Nov 27, 2023

Turns out asyncio.wait is non-deterministic. Here is the problem: https://github.com/python/cpython/blob/967f2a3052c2d22e31564b428a9aa8cc63dc2a9f/Lib/asyncio/tasks.py#L443

This is not a Temporal-specific issue. What is the output of the following?

    async def do_something(name: str) -> None:
        print(f"Run {name}")
    await asyncio.wait([
        do_something("something1"),
        do_something("something2"),
        do_something("something3"),
    ])
    await asyncio.wait([
        do_something("something4"),
        do_something("something5"),
        do_something("something6"),
    ])

In 3.10, ignoring deprecation warnings, it may be something like (but it can change with only slight code alterations):

Run something1
Run something2
Run something3
Run something6
Run something4
Run something5

Why was that output not predictable?

Unfortunately, Python in asyncio.wait non-deterministically converts the input to a set (which unlike even dict, does not have any reliable ordering). This means the coroutines are not even started in the same order. This problem does not exist with start_activity because that is a sync function that returns an awaitable (so it is called when you invoke it not when awaited), but execute_activity is an async function (so it is called when it's awaited not when it's invoked). This means that foo = workflow.execute_activity(...) does nothing until await foo is run.

The reason this works with create_task is because create_task starts the coroutine immediately when create_task is run, so the coroutines run in a deterministic order.

So, basically we need to educate people that asyncio.wait is non-deterministic when used with coroutines instead of tasks (which isn't allowed in 3.11+). I was originally going to open a CPython issue to stop converting to a set (digging through git blame, this was done way back in at least 3.4), but that would only apply to new versions (doubt they'd cherry pick this), so you'd be forced to use tasks anyways.

Does this make sense? I will add a README note/warning. We could offer our own deterministic form of this function, but we'd probably require tasks anyways like Python does now. Any alternative suggestions or ways we can approach this?

@cretz cretz changed the title [Bug] Concurrent activity execution rarely results in mismatched activity inputs and outputs [Bug] asyncio.wait is non-deterministic when used with coroutines instead of tasks Nov 27, 2023
@ntessman-capsule
Copy link
Author

Thanks for your investigation @cretz. I'll leave this open for comment in case anyone has a better idea, but for my purposes this issue is resolved through the use of tasks. Personally, I think the only thing that can be done is provide a warning. It's a gotcha that is on Python's shoulders, not the SDK, and as you said it's unlikely that a fix will be applied retroactively.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants