Skip to content

Commit b902ec8

Browse files
authored
Swallow Python exceptions better on workflow GC from eviction (#341)
Fixes #325
1 parent 317dd9b commit b902ec8

File tree

3 files changed

+116
-9
lines changed

3 files changed

+116
-9
lines changed

temporalio/worker/_workflow_instance.py

Lines changed: 40 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,16 @@ def __init__(self, det: WorkflowInstanceDetails) -> None:
242242
# Set ourselves on our own loop
243243
temporalio.workflow._Runtime.set_on_loop(self, self)
244244

245+
# After GC, Python raises GeneratorExit calls from all awaiting tasks.
246+
# Then in a finally of such an await, another exception can swallow
247+
# these causing even more issues. We will set ourselves as deleted so we
248+
# can check in some places to swallow these errors on tear down.
249+
self._deleting = False
250+
251+
def __del__(self) -> None:
252+
# We have confirmed there are no super() versions of __del__
253+
self._deleting = True
254+
245255
#### Activation functions ####
246256
# These are in alphabetical order and besides "activate", all other calls
247257
# are "_apply_" + the job field name.
@@ -629,14 +639,26 @@ def _apply_start_workflow(
629639
# Async call to run on the scheduler thread. This will be wrapped in
630640
# another function which applies exception handling.
631641
async def run_workflow(input: ExecuteWorkflowInput) -> None:
632-
result = await self._inbound.execute_workflow(input)
633-
result_payloads = self._payload_converter.to_payloads([result])
634-
if len(result_payloads) != 1:
635-
raise ValueError(
636-
f"Expected 1 result payload, got {len(result_payloads)}"
637-
)
638-
command = self._add_command()
639-
command.complete_workflow_execution.result.CopyFrom(result_payloads[0])
642+
try:
643+
result = await self._inbound.execute_workflow(input)
644+
result_payloads = self._payload_converter.to_payloads([result])
645+
if len(result_payloads) != 1:
646+
raise ValueError(
647+
f"Expected 1 result payload, got {len(result_payloads)}"
648+
)
649+
command = self._add_command()
650+
command.complete_workflow_execution.result.CopyFrom(result_payloads[0])
651+
except BaseException as err:
652+
# During tear down, generator exit and event loop exceptions can occur
653+
if not self._deleting:
654+
raise
655+
if not isinstance(
656+
err,
657+
(GeneratorExit, temporalio.workflow._NotInWorkflowEventLoopError),
658+
):
659+
logger.debug(
660+
"Ignoring exception while deleting workflow", exc_info=True
661+
)
640662

641663
# Schedule it
642664
input = ExecuteWorkflowInput(
@@ -1260,6 +1282,16 @@ async def _run_top_level_workflow_function(self, coro: Awaitable[None]) -> None:
12601282
else:
12611283
# All other exceptions fail the task
12621284
self._current_activation_error = err
1285+
except BaseException as err:
1286+
# During tear down, generator exit and no-runtime exceptions can appear
1287+
if not self._deleting:
1288+
raise
1289+
if not isinstance(
1290+
err, (GeneratorExit, temporalio.workflow._NotInWorkflowEventLoopError)
1291+
):
1292+
logger.debug(
1293+
"Ignoring exception while deleting workflow", exc_info=True
1294+
)
12631295

12641296
def _set_workflow_failure(self, err: temporalio.exceptions.FailureError) -> None:
12651297
# All other failure errors fail the workflow

temporalio/workflow.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -340,7 +340,7 @@ class _Runtime(ABC):
340340
def current() -> _Runtime:
341341
loop = _Runtime.maybe_current()
342342
if not loop:
343-
raise RuntimeError("Not in workflow event loop")
343+
raise _NotInWorkflowEventLoopError("Not in workflow event loop")
344344
return loop
345345

346346
@staticmethod
@@ -3843,6 +3843,12 @@ def __init__(self, message: str) -> None:
38433843
self.message = message
38443844

38453845

3846+
class _NotInWorkflowEventLoopError(temporalio.exceptions.TemporalError):
3847+
def __init__(self, *args: object) -> None:
3848+
super().__init__("Not in workflow event loop")
3849+
self.message = "Not in workflow event loop"
3850+
3851+
38463852
class VersioningIntent(Enum):
38473853
"""Indicates whether the user intends certain commands to be run on a compatible worker Build
38483854
Id version or not.

tests/worker/test_workflow.py

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import logging.handlers
66
import pickle
77
import queue
8+
import sys
89
import threading
910
import uuid
1011
from abc import ABC, abstractmethod
@@ -2843,3 +2844,71 @@ async def test_manual_result_type(client: Client):
28432844
assert res3 == {"some_string": "from-query"}
28442845
res4 = await handle.query("some_query", result_type=ManualResultType)
28452846
assert res4 == ManualResultType(some_string="from-query")
2847+
2848+
2849+
@workflow.defn
2850+
class SwallowGeneratorExitWorkflow:
2851+
def __init__(self) -> None:
2852+
self._signal_count = 0
2853+
2854+
@workflow.run
2855+
async def run(self) -> None:
2856+
try:
2857+
# Wait for signal count to reach 2
2858+
await workflow.wait_condition(lambda: self._signal_count > 1)
2859+
finally:
2860+
# This finally, on eviction, is actually called because the above
2861+
# await raises GeneratorExit. Then this will raise a
2862+
# _NotInWorkflowEventLoopError swallowing that.
2863+
await workflow.wait_condition(lambda: self._signal_count > 2)
2864+
2865+
@workflow.signal
2866+
async def signal(self) -> None:
2867+
self._signal_count += 1
2868+
2869+
@workflow.query
2870+
async def signal_count(self) -> int:
2871+
return self._signal_count
2872+
2873+
2874+
async def test_swallow_generator_exit(client: Client):
2875+
if sys.version_info < (3, 8):
2876+
pytest.skip("sys.unraisablehook not in 3.7")
2877+
# This test simulates GeneratorExit and GC issues by forcing eviction on
2878+
# each step
2879+
async with new_worker(
2880+
client, SwallowGeneratorExitWorkflow, max_cached_workflows=0
2881+
) as worker:
2882+
# Put a hook to catch unraisable exceptions
2883+
old_hook = sys.unraisablehook
2884+
hook_calls: List[Any] = []
2885+
sys.unraisablehook = hook_calls.append
2886+
try:
2887+
handle = await client.start_workflow(
2888+
SwallowGeneratorExitWorkflow.run,
2889+
id=f"wf-{uuid.uuid4()}",
2890+
task_queue=worker.task_queue,
2891+
)
2892+
2893+
async def signal_count() -> int:
2894+
return await handle.query(SwallowGeneratorExitWorkflow.signal_count)
2895+
2896+
# Confirm signal count as 0
2897+
await assert_eq_eventually(0, signal_count)
2898+
2899+
# Send signal and confirm it's at 1
2900+
await handle.signal(SwallowGeneratorExitWorkflow.signal)
2901+
await assert_eq_eventually(1, signal_count)
2902+
2903+
await handle.signal(SwallowGeneratorExitWorkflow.signal)
2904+
await assert_eq_eventually(2, signal_count)
2905+
2906+
await handle.signal(SwallowGeneratorExitWorkflow.signal)
2907+
await assert_eq_eventually(3, signal_count)
2908+
2909+
await handle.result()
2910+
finally:
2911+
sys.unraisablehook = old_hook
2912+
2913+
# Confirm no unraisable exceptions
2914+
assert not hook_calls

0 commit comments

Comments
 (0)