Skip to content

Add WorkflowUpdateRPCTimeoutOrCancelledError #548

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jun 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ jobs:
# Build and test the project
build-lint-test:
strategy:
fail-fast: true
fail-fast: false
matrix:
python: ["3.8", "3.12"]
os: [ubuntu-latest, ubuntu-arm, macos-intel, macos-arm, windows-latest]
Expand Down
82 changes: 68 additions & 14 deletions temporalio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1852,7 +1852,10 @@ async def execute_update(
rpc_timeout: Optional RPC deadline to set for the RPC call.

Raises:
WorkflowUpdateFailedError: If the update failed
WorkflowUpdateFailedError: If the update failed.
WorkflowUpdateRPCTimeoutOrCancelledError: This update call timed out
or was cancelled. This doesn't mean the update itself was timed
out or cancelled.
RPCError: There was some issue sending the update to the workflow.
"""
handle = await self._start_update(
Expand Down Expand Up @@ -1968,6 +1971,9 @@ async def start_update(
rpc_timeout: Optional RPC deadline to set for the RPC call.

Raises:
WorkflowUpdateRPCTimeoutOrCancelledError: This update call timed out
or was cancelled. This doesn't mean the update itself was timed
out or cancelled.
RPCError: There was some issue sending the update to the workflow.
"""
return await self._start_update(
Expand Down Expand Up @@ -4305,7 +4311,10 @@ async def result(
it will be retried until the result is available.

Raises:
WorkflowUpdateFailedError: If the update failed
WorkflowUpdateFailedError: If the update failed.
WorkflowUpdateRPCTimeoutOrCancelledError: This update call timed out
or was cancelled. This doesn't mean the update itself was timed
out or cancelled.
RPCError: Update result could not be fetched for some other reason.
"""
# Poll until outcome reached
Expand Down Expand Up @@ -4357,15 +4366,28 @@ async def _poll_until_outcome(

# Continue polling as long as we have no outcome
while True:
res = await self._client.workflow_service.poll_workflow_execution_update(
req,
retry=True,
metadata=rpc_metadata,
timeout=rpc_timeout,
)
if res.HasField("outcome"):
self._known_outcome = res.outcome
return
try:
res = (
await self._client.workflow_service.poll_workflow_execution_update(
req,
retry=True,
metadata=rpc_metadata,
timeout=rpc_timeout,
)
)
if res.HasField("outcome"):
self._known_outcome = res.outcome
return
except RPCError as err:
if (
err.status == RPCStatusCode.DEADLINE_EXCEEDED
or err.status == RPCStatusCode.CANCELLED
):
raise WorkflowUpdateRPCTimeoutOrCancelledError() from err
else:
raise
except asyncio.CancelledError as err:
raise WorkflowUpdateRPCTimeoutOrCancelledError() from err


class WorkflowUpdateStage(IntEnum):
Expand Down Expand Up @@ -4456,6 +4478,24 @@ def cause(self) -> BaseException:
return self.__cause__


class RPCTimeoutOrCancelledError(temporalio.exceptions.TemporalError):
"""Error that occurs on some client calls that timeout or get cancelled."""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"""Error that occurs on some client calls that timeout or get cancelled."""
"""Error that occurs on some client calls when they time out or get cancelled."""


pass


class WorkflowUpdateRPCTimeoutOrCancelledError(RPCTimeoutOrCancelledError):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like feedback on whether this is what we want to call this. Options:

  • WorkflowUpdateRPCTimeoutOrCancelledError
  • WorkflowUpdateRPCTimedOutOrCancelledError
  • WorkflowUpdateRPCTimeoutOrCancellationError

There may be some other options, but it's important that we include "rpc" in here so users know it's about the RPC call and not some potential future concept of update timeout/cancellation.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd voted TimedOut, since that consistent with cancelled

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 Will leave open a bit to get more opinions (and confer w/ team before decision)

Copy link

@drewhoskins-temporal drewhoskins-temporal Jun 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think Timeout for search optimization despite grammatical oddness. It's what people in python expect.

it's important that we include "rpc" in here so users know it's about the RPC call and not some potential future concept of update timeout/cancellation

👎 For me, the main point of this exercise is to reduce the size and complexity of the handler block and the number of times users need to add logic. Thus we want to capture all timeouts into one exception. If we need to disambiguate in the future for power users, let's add fields.

Taking this idea further, please consider one more option. Given that we have the GRPC::Canceled inner exception, that also lets us keep the name simple. For example, we can just do WorkflowUpdateTimeoutError leaving us the ability to have a Canceled error when we add update cancels. But I don't feel strongly about this one.

Copy link

@drewhoskins-temporal drewhoskins-temporal Jun 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Chad and I chatted offline. He's thinking of when we add a "run timeout" to the update, and I'm convinced that we should distinguish "the client gave up" from "the workflow gave up".

Lemme clarify my thoughts. The thing that snags me about "RPC" specifically is it's a leaky abstraction, at least the way I read it. You could easily implement these patterns by polling or by having two RPC calls (start and then wait vs execute), in which case the meaning of "RPC" changes and could be ambiguous. If I put an rpc_timeout=30 but I have client retries, what does that mean? Does each RPC get 30 seconds, or do I as the user want to wait 30 seconds total?

What we really mean, semantically, is "the client gave up" so in the abstract I would prefer terms like ClientTimeoutError. (And maybe that's also general enough to encompass the "client canceled" meaning?) But since we already expose fields like rpc_timeout, we've already crossed that bridge, so I'm fine with the naming you chose if you like it better or feel it's more consistent with other parts of the product.

When we add durable-on-admitted, it'd be nice if we could add a field on this exception like last_wait_stage_confirmed={None, Admitted, Accepted}. (But perhaps people who want that should use start_update(wait_stage=Admitted) and then poll on the handle.)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we've already crossed that bridge

Not necessarily. This is a brand new thing to wrap these kinds of gRPC errors, so we can change. We can have ClientTimeoutError and WorkflowUpdateClientTimeoutError, but need to confirm that that also includes Cancelled status codes and asyncio.Cancelled cancellation or if we just want it as DeadlineExceeded (I am not sure that "timeout" is a good name for something that includes cancellation).

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would leave off the RPC prefix if the exceptions are already in the client package. Otherwise I have no strong opinion

"""Error that occurs when update RPC call times out or is cancelled.

Note, this is not related to any general concept of timing out or cancelling
a running update, this is only related to the client call itself.
"""

def __init__(self) -> None:
"""Create workflow update timeout or cancelled error."""
super().__init__("Timeout or cancellation waiting for update")


class AsyncActivityCancelledError(temporalio.exceptions.TemporalError):
"""Error that occurs when async activity attempted heartbeat but was cancelled."""

Expand Down Expand Up @@ -5261,9 +5301,23 @@ async def start_workflow_update(
# the user cannot specify sooner than ACCEPTED)
resp: temporalio.api.workflowservice.v1.UpdateWorkflowExecutionResponse
while True:
resp = await self._client.workflow_service.update_workflow_execution(
req, retry=True, metadata=input.rpc_metadata, timeout=input.rpc_timeout
)
try:
resp = await self._client.workflow_service.update_workflow_execution(
req,
retry=True,
metadata=input.rpc_metadata,
timeout=input.rpc_timeout,
)
except RPCError as err:
if (
err.status == RPCStatusCode.DEADLINE_EXCEEDED
or err.status == RPCStatusCode.CANCELLED
):
raise WorkflowUpdateRPCTimeoutOrCancelledError() from err
else:
raise
except asyncio.CancelledError as err:
raise WorkflowUpdateRPCTimeoutOrCancelledError() from err
if (
resp.stage >= req.wait_policy.lifecycle_stage
or resp.stage
Expand Down
23 changes: 23 additions & 0 deletions tests/helpers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,14 @@
from datetime import timedelta
from typing import Awaitable, Callable, Optional, Sequence, Type, TypeVar

from temporalio.api.common.v1 import WorkflowExecution
from temporalio.api.enums.v1 import IndexedValueType
from temporalio.api.operatorservice.v1 import (
AddSearchAttributesRequest,
ListSearchAttributesRequest,
)
from temporalio.api.update.v1 import UpdateRef
from temporalio.api.workflowservice.v1 import PollWorkflowExecutionUpdateRequest
from temporalio.client import BuildIdOpAddNewDefault, Client
from temporalio.common import SearchAttributeKey
from temporalio.service import RPCError, RPCStatusCode
Expand Down Expand Up @@ -105,3 +108,23 @@ def find_free_port() -> int:
s.bind(("", 0))
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
return s.getsockname()[1]


async def workflow_update_exists(
client: Client, workflow_id: str, update_id: str
) -> bool:
try:
await client.workflow_service.poll_workflow_execution_update(
PollWorkflowExecutionUpdateRequest(
namespace=client.namespace,
update_ref=UpdateRef(
workflow_execution=WorkflowExecution(workflow_id=workflow_id),
update_id=update_id,
),
)
)
return True
except RPCError as err:
if err.status != RPCStatusCode.NOT_FOUND:
raise
return False
128 changes: 108 additions & 20 deletions tests/worker/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
from temporalio.api.update.v1 import UpdateRef
from temporalio.api.workflowservice.v1 import (
GetWorkflowExecutionHistoryRequest,
PollWorkflowExecutionUpdateRequest,
ResetStickyTaskQueueRequest,
)
from temporalio.bridge.proto.workflow_activation import WorkflowActivation
Expand All @@ -57,6 +56,7 @@
WorkflowQueryFailedError,
WorkflowUpdateFailedError,
WorkflowUpdateHandle,
WorkflowUpdateRPCTimeoutOrCancelledError,
WorkflowUpdateStage,
)
from temporalio.common import (
Expand Down Expand Up @@ -107,6 +107,7 @@
ensure_search_attributes_present,
find_free_port,
new_worker,
workflow_update_exists,
)
from tests.helpers.external_coroutine import wait_on_timer
from tests.helpers.external_stack_trace import (
Expand Down Expand Up @@ -4364,25 +4365,8 @@ async def test_workflow_update_before_worker_start(
task_queue=task_queue,
)

async def update_exists() -> bool:
try:
await client.workflow_service.poll_workflow_execution_update(
PollWorkflowExecutionUpdateRequest(
namespace=client.namespace,
update_ref=UpdateRef(
workflow_execution=WorkflowExecution(workflow_id=handle.id),
update_id="my-update",
),
)
)
return True
except RPCError as err:
if err.status != RPCStatusCode.NOT_FOUND:
raise
return False

# Confirm update not there
assert not await update_exists()
assert not await workflow_update_exists(client, handle.id, "my-update")

# Execute update in background
update_task = asyncio.create_task(
Expand All @@ -4392,7 +4376,9 @@ async def update_exists() -> bool:
)

# Wait until update exists
await assert_eq_eventually(True, update_exists)
await assert_eq_eventually(
True, lambda: workflow_update_exists(client, handle.id, "my-update")
)

# Start no-cache worker on the task queue
async with new_worker(
Expand Down Expand Up @@ -4466,6 +4452,108 @@ async def test_workflow_update_separate_handle(
assert "workflow-done" == await handle.result()


@workflow.defn
class UpdateTimeoutOrCancelWorkflow:
@workflow.run
async def run(self) -> None:
await workflow.wait_condition(lambda: False)

@workflow.update
async def do_update(self, sleep: float) -> None:
await asyncio.sleep(sleep)


async def test_workflow_update_timeout_or_cancel(
client: Client, env: WorkflowEnvironment
):
if env.supports_time_skipping:
pytest.skip(
"Java test server: https://github.com/temporalio/sdk-java/issues/1903"
)

# Confirm start timeout via short timeout on update w/ no worker running
handle = await client.start_workflow(
UpdateTimeoutOrCancelWorkflow.run,
id=f"wf-{uuid.uuid4()}",
task_queue="does-not-exist",
)
with pytest.raises(WorkflowUpdateRPCTimeoutOrCancelledError):
await handle.start_update(
UpdateTimeoutOrCancelWorkflow.do_update,
1000,
wait_for_stage=WorkflowUpdateStage.ACCEPTED,
rpc_timeout=timedelta(milliseconds=1),
)

# Confirm start cancel via cancel on update w/ no worker running
handle = await client.start_workflow(
UpdateTimeoutOrCancelWorkflow.run,
id=f"wf-{uuid.uuid4()}",
task_queue="does-not-exist",
)
task = asyncio.create_task(
handle.start_update(
UpdateTimeoutOrCancelWorkflow.do_update,
1000,
wait_for_stage=WorkflowUpdateStage.ACCEPTED,
id="my-update",
)
)
# Have to wait for update to exist before cancelling to capture
await assert_eq_eventually(
True, lambda: workflow_update_exists(client, handle.id, "my-update")
)
task.cancel()
with pytest.raises(WorkflowUpdateRPCTimeoutOrCancelledError):
await task

# Start worker
async with new_worker(client, UpdateTimeoutOrCancelWorkflow) as worker:
# Start the workflow
handle = await client.start_workflow(
UpdateTimeoutOrCancelWorkflow.run,
id=f"wf-{uuid.uuid4()}",
task_queue=worker.task_queue,
)
# Start an update
update_handle = await handle.start_update(
UpdateTimeoutOrCancelWorkflow.do_update,
1000,
wait_for_stage=WorkflowUpdateStage.ACCEPTED,
)
# Timeout a poll call
with pytest.raises(WorkflowUpdateRPCTimeoutOrCancelledError):
await update_handle.result(rpc_timeout=timedelta(milliseconds=1))

# Cancel a poll call
update_handle = await handle.start_update(
UpdateTimeoutOrCancelWorkflow.do_update,
1000,
wait_for_stage=WorkflowUpdateStage.ACCEPTED,
)
result_task = asyncio.create_task(update_handle.result())
# Unfortunately there is not a way for us to confirm this is actually
# pending the server call and if you cancel early you get an asyncio
# cancelled error because it never even reached the gRPC client. We
# considered sleeping, but that makes for flaky tests. So what we are
# going to do is patch the poll call to notify us when it was called.
called = asyncio.Event()
unpatched_call = client.workflow_service.poll_workflow_execution_update

async def patched_call(*args, **kwargs):
called.set()
return await unpatched_call(*args, **kwargs)

client.workflow_service.poll_workflow_execution_update = patched_call # type: ignore
try:
await called.wait()
finally:
client.workflow_service.poll_workflow_execution_update = unpatched_call
result_task.cancel()
with pytest.raises(WorkflowUpdateRPCTimeoutOrCancelledError):
await result_task


@workflow.defn
class TimeoutSupportWorkflow:
@workflow.run
Expand Down
Loading