-
Notifications
You must be signed in to change notification settings - Fork 103
Add WorkflowUpdateRPCTimeoutOrCancelledError #548
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1852,7 +1852,10 @@ async def execute_update( | |
rpc_timeout: Optional RPC deadline to set for the RPC call. | ||
|
||
Raises: | ||
WorkflowUpdateFailedError: If the update failed | ||
WorkflowUpdateFailedError: If the update failed. | ||
WorkflowUpdateRPCTimeoutOrCancelledError: This update call timed out | ||
or was cancelled. This doesn't mean the update itself was timed | ||
out or cancelled. | ||
RPCError: There was some issue sending the update to the workflow. | ||
""" | ||
handle = await self._start_update( | ||
|
@@ -1968,6 +1971,9 @@ async def start_update( | |
rpc_timeout: Optional RPC deadline to set for the RPC call. | ||
|
||
Raises: | ||
WorkflowUpdateRPCTimeoutOrCancelledError: This update call timed out | ||
or was cancelled. This doesn't mean the update itself was timed | ||
out or cancelled. | ||
RPCError: There was some issue sending the update to the workflow. | ||
""" | ||
return await self._start_update( | ||
|
@@ -4305,7 +4311,10 @@ async def result( | |
it will be retried until the result is available. | ||
|
||
Raises: | ||
WorkflowUpdateFailedError: If the update failed | ||
WorkflowUpdateFailedError: If the update failed. | ||
WorkflowUpdateRPCTimeoutOrCancelledError: This update call timed out | ||
or was cancelled. This doesn't mean the update itself was timed | ||
out or cancelled. | ||
RPCError: Update result could not be fetched for some other reason. | ||
""" | ||
# Poll until outcome reached | ||
|
@@ -4357,15 +4366,28 @@ async def _poll_until_outcome( | |
|
||
# Continue polling as long as we have no outcome | ||
while True: | ||
res = await self._client.workflow_service.poll_workflow_execution_update( | ||
req, | ||
retry=True, | ||
metadata=rpc_metadata, | ||
timeout=rpc_timeout, | ||
) | ||
if res.HasField("outcome"): | ||
self._known_outcome = res.outcome | ||
return | ||
try: | ||
res = ( | ||
await self._client.workflow_service.poll_workflow_execution_update( | ||
req, | ||
retry=True, | ||
metadata=rpc_metadata, | ||
timeout=rpc_timeout, | ||
) | ||
) | ||
if res.HasField("outcome"): | ||
self._known_outcome = res.outcome | ||
return | ||
except RPCError as err: | ||
if ( | ||
err.status == RPCStatusCode.DEADLINE_EXCEEDED | ||
or err.status == RPCStatusCode.CANCELLED | ||
): | ||
raise WorkflowUpdateRPCTimeoutOrCancelledError() from err | ||
else: | ||
raise | ||
except asyncio.CancelledError as err: | ||
raise WorkflowUpdateRPCTimeoutOrCancelledError() from err | ||
|
||
|
||
class WorkflowUpdateStage(IntEnum): | ||
|
@@ -4456,6 +4478,24 @@ def cause(self) -> BaseException: | |
return self.__cause__ | ||
|
||
|
||
class RPCTimeoutOrCancelledError(temporalio.exceptions.TemporalError): | ||
"""Error that occurs on some client calls that timeout or get cancelled.""" | ||
|
||
pass | ||
|
||
|
||
class WorkflowUpdateRPCTimeoutOrCancelledError(RPCTimeoutOrCancelledError): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would like feedback on whether this is what we want to call this. Options:
There may be some other options, but it's important that we include "rpc" in here so users know it's about the RPC call and not some potential future concept of update timeout/cancellation. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd voted There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 Will leave open a bit to get more opinions (and confer w/ team before decision) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think
👎 For me, the main point of this exercise is to reduce the size and complexity of the handler block and the number of times users need to add logic. Thus we want to capture all timeouts into one exception. If we need to disambiguate in the future for power users, let's add fields. Taking this idea further, please consider one more option. Given that we have the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Chad and I chatted offline. He's thinking of when we add a "run timeout" to the update, and I'm convinced that we should distinguish "the client gave up" from "the workflow gave up". Lemme clarify my thoughts. The thing that snags me about "RPC" specifically is it's a leaky abstraction, at least the way I read it. You could easily implement these patterns by polling or by having two RPC calls (start and then wait vs execute), in which case the meaning of "RPC" changes and could be ambiguous. If I put an What we really mean, semantically, is "the client gave up" so in the abstract I would prefer terms like When we add durable-on-admitted, it'd be nice if we could add a field on this exception like There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Not necessarily. This is a brand new thing to wrap these kinds of gRPC errors, so we can change. We can have There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would leave off the |
||
"""Error that occurs when update RPC call times out or is cancelled. | ||
|
||
Note, this is not related to any general concept of timing out or cancelling | ||
a running update, this is only related to the client call itself. | ||
""" | ||
|
||
def __init__(self) -> None: | ||
"""Create workflow update timeout or cancelled error.""" | ||
super().__init__("Timeout or cancellation waiting for update") | ||
|
||
|
||
class AsyncActivityCancelledError(temporalio.exceptions.TemporalError): | ||
"""Error that occurs when async activity attempted heartbeat but was cancelled.""" | ||
|
||
|
@@ -5261,9 +5301,23 @@ async def start_workflow_update( | |
# the user cannot specify sooner than ACCEPTED) | ||
resp: temporalio.api.workflowservice.v1.UpdateWorkflowExecutionResponse | ||
while True: | ||
resp = await self._client.workflow_service.update_workflow_execution( | ||
req, retry=True, metadata=input.rpc_metadata, timeout=input.rpc_timeout | ||
) | ||
try: | ||
resp = await self._client.workflow_service.update_workflow_execution( | ||
req, | ||
retry=True, | ||
metadata=input.rpc_metadata, | ||
timeout=input.rpc_timeout, | ||
) | ||
except RPCError as err: | ||
if ( | ||
err.status == RPCStatusCode.DEADLINE_EXCEEDED | ||
or err.status == RPCStatusCode.CANCELLED | ||
): | ||
raise WorkflowUpdateRPCTimeoutOrCancelledError() from err | ||
else: | ||
raise | ||
except asyncio.CancelledError as err: | ||
raise WorkflowUpdateRPCTimeoutOrCancelledError() from err | ||
if ( | ||
resp.stage >= req.wait_policy.lifecycle_stage | ||
or resp.stage | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.