Skip to content

Commit 466da16

Browse files
authored
Safe Eviction (#499)
1 parent b07e75e commit 466da16

File tree

5 files changed

+235
-117
lines changed

5 files changed

+235
-117
lines changed

temporalio/worker/_replayer.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ def __init__(
4545
identity: Optional[str] = None,
4646
debug_mode: bool = False,
4747
runtime: Optional[temporalio.runtime.Runtime] = None,
48+
disable_safe_workflow_eviction: bool = False,
4849
) -> None:
4950
"""Create a replayer to replay workflows from history.
5051
@@ -67,6 +68,7 @@ def __init__(
6768
identity=identity,
6869
debug_mode=debug_mode,
6970
runtime=runtime,
71+
disable_safe_workflow_eviction=disable_safe_workflow_eviction,
7072
)
7173

7274
def config(self) -> ReplayerConfig:
@@ -228,6 +230,9 @@ def on_eviction_hook(
228230
metric_meter=runtime.metric_meter,
229231
on_eviction_hook=on_eviction_hook,
230232
disable_eager_activity_execution=False,
233+
disable_safe_eviction=self._config[
234+
"disable_safe_workflow_eviction"
235+
],
231236
).run()
232237
)
233238

@@ -298,6 +303,7 @@ class ReplayerConfig(TypedDict, total=False):
298303
identity: Optional[str]
299304
debug_mode: bool
300305
runtime: Optional[temporalio.runtime.Runtime]
306+
disable_safe_workflow_eviction: bool
301307

302308

303309
@dataclass(frozen=True)

temporalio/worker/_worker.py

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ def __init__(
7878
disable_eager_activity_execution: bool = False,
7979
on_fatal_error: Optional[Callable[[BaseException], Awaitable[None]]] = None,
8080
use_worker_versioning: bool = False,
81+
disable_safe_workflow_eviction: bool = False,
8182
) -> None:
8283
"""Create a worker to process workflows and/or activities.
8384
@@ -183,11 +184,19 @@ def __init__(
183184
on_fatal_error: An async function that can handle a failure before
184185
the worker shutdown commences. This cannot stop the shutdown and
185186
any exception raised is logged and ignored.
186-
use_worker_versioning: If true, the `build_id` argument must be specified, and this
187-
worker opts into the worker versioning feature. This ensures it only receives
188-
workflow tasks for workflows which it claims to be compatible with.
189-
190-
For more information, see https://docs.temporal.io/workers#worker-versioning
187+
use_worker_versioning: If true, the `build_id` argument must be
188+
specified, and this worker opts into the worker versioning
189+
feature. This ensures it only receives workflow tasks for
190+
workflows which it claims to be compatible with. For more
191+
information, see
192+
https://docs.temporal.io/workers#worker-versioning.
193+
disable_safe_workflow_eviction: If true, instead of letting the
194+
workflow collect its tasks properly, the worker will simply let
195+
the Python garbage collector collect the tasks. WARNING: Users
196+
should not set this value to true. The garbage collector will
197+
throw ``GeneratorExit`` in coroutines causing them to to wake up
198+
in different threads and run ``finally`` and other code in the
199+
wrong workflow environment.
191200
"""
192201
if not activities and not workflows:
193202
raise ValueError("At least one activity or workflow must be specified")
@@ -254,6 +263,7 @@ def __init__(
254263
disable_eager_activity_execution=disable_eager_activity_execution,
255264
on_fatal_error=on_fatal_error,
256265
use_worker_versioning=use_worker_versioning,
266+
disable_safe_workflow_eviction=disable_safe_workflow_eviction,
257267
)
258268
self._started = False
259269
self._shutdown_event = asyncio.Event()
@@ -303,6 +313,7 @@ def __init__(
303313
disable_eager_activity_execution=disable_eager_activity_execution,
304314
metric_meter=runtime.metric_meter,
305315
on_eviction_hook=None,
316+
disable_safe_eviction=disable_safe_workflow_eviction,
306317
)
307318

308319
# We need an already connected client
@@ -473,9 +484,11 @@ async def raise_on_shutdown():
473484
assert self._workflow_worker
474485
tasks[2] = asyncio.create_task(self._workflow_worker.drain_poll_queue())
475486

476-
# Set worker-shutdown event
487+
# Notify shutdown occurring
477488
if self._activity_worker:
478489
self._activity_worker.notify_shutdown()
490+
if self._workflow_worker:
491+
self._workflow_worker.notify_shutdown()
479492

480493
# Wait for all tasks to complete (i.e. for poller loops to stop)
481494
await asyncio.wait(tasks)
@@ -597,6 +610,7 @@ class WorkerConfig(TypedDict, total=False):
597610
disable_eager_activity_execution: bool
598611
on_fatal_error: Optional[Callable[[BaseException], Awaitable[None]]]
599612
use_worker_versioning: bool
613+
disable_safe_workflow_eviction: bool
600614

601615

602616
_default_build_id: Optional[str] = None

temporalio/worker/_workflow.py

Lines changed: 50 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ def __init__(
6060
[str, temporalio.bridge.proto.workflow_activation.RemoveFromCache], None
6161
]
6262
],
63+
disable_safe_eviction: bool,
6364
) -> None:
6465
self._bridge_worker = bridge_worker
6566
self._namespace = namespace
@@ -91,6 +92,7 @@ def __init__(
9192
self._running_workflows: Dict[str, WorkflowInstance] = {}
9293
self._disable_eager_activity_execution = disable_eager_activity_execution
9394
self._on_eviction_hook = on_eviction_hook
95+
self._disable_safe_eviction = disable_safe_eviction
9496
self._throw_after_activation: Optional[Exception] = None
9597

9698
# If there's a debug mode or a truthy TEMPORAL_DEBUG env var, disable
@@ -99,6 +101,9 @@ def __init__(
99101
None if debug_mode or os.environ.get("TEMPORAL_DEBUG") else 2
100102
)
101103

104+
# Keep track of workflows that could not be evicted
105+
self._could_not_evict_count = 0
106+
102107
# Validate and build workflow dict
103108
self._workflows: Dict[str, temporalio.workflow._Definition] = {}
104109
self._dynamic_workflow: Optional[temporalio.workflow._Definition] = None
@@ -155,6 +160,13 @@ async def run(self) -> None:
155160
if self._throw_after_activation:
156161
raise self._throw_after_activation
157162

163+
def notify_shutdown(self) -> None:
164+
if self._could_not_evict_count:
165+
logger.warn(
166+
f"Shutting down workflow worker, but {self._could_not_evict_count} "
167+
+ "workflow(s) could not be evicted previously, so the shutdown will hang"
168+
)
169+
158170
# Only call this if run() raised an error
159171
async def drain_poll_queue(self) -> None:
160172
while True:
@@ -182,42 +194,43 @@ async def _handle_activation(
182194
cache_remove_job = job.remove_from_cache
183195
elif job.HasField("start_workflow"):
184196
start_job = job.start_workflow
185-
cache_remove_only_activation = len(act.jobs) == 1 and cache_remove_job
186197

187198
# Build default success completion (e.g. remove-job-only activations)
188199
completion = (
189200
temporalio.bridge.proto.workflow_completion.WorkflowActivationCompletion()
190201
)
191202
completion.successful.SetInParent()
192203
try:
193-
# Decode the activation if there's a codec and it's not a
194-
# cache-remove-only activation
195-
if self._data_converter.payload_codec and not cache_remove_only_activation:
204+
# Decode the activation if there's a codec and not cache remove job
205+
if self._data_converter.payload_codec and not cache_remove_job:
196206
await temporalio.bridge.worker.decode_activation(
197207
act, self._data_converter.payload_codec
198208
)
199209

200210
if LOG_PROTOS:
201211
logger.debug("Received workflow activation:\n%s", act)
202212

203-
# We only have to run if there are any non-remove-from-cache jobs
204-
if not cache_remove_only_activation:
205-
# If the workflow is not running yet, create it
213+
# If the workflow is not running yet and this isn't a cache remove
214+
# job, create it. We do not even fetch a workflow if it's a cache
215+
# remove job and safe evictions are enabled
216+
workflow = None
217+
if not cache_remove_job or not self._disable_safe_eviction:
206218
workflow = self._running_workflows.get(act.run_id)
207-
if not workflow:
208-
# Must have a start job to create instance
209-
if not start_job:
210-
raise RuntimeError(
211-
"Missing start workflow, workflow could have unexpectedly been removed from cache"
212-
)
213-
workflow = self._create_workflow_instance(act, start_job)
214-
self._running_workflows[act.run_id] = workflow
215-
elif start_job:
216-
# This should never happen
217-
logger.warn("Cache already exists for activation with start job")
218-
219-
# Run activation in separate thread so we can check if it's
220-
# deadlocked
219+
if not workflow and not cache_remove_job:
220+
# Must have a start job to create instance
221+
if not start_job:
222+
raise RuntimeError(
223+
"Missing start workflow, workflow could have unexpectedly been removed from cache"
224+
)
225+
workflow = self._create_workflow_instance(act, start_job)
226+
self._running_workflows[act.run_id] = workflow
227+
elif start_job:
228+
# This should never happen
229+
logger.warn("Cache already exists for activation with start job")
230+
231+
# Run activation in separate thread so we can check if it's
232+
# deadlocked
233+
if workflow:
221234
activate_task = asyncio.get_running_loop().run_in_executor(
222235
self._workflow_task_executor,
223236
workflow.activate,
@@ -234,6 +247,17 @@ async def _handle_activation(
234247
f"[TMPRL1101] Potential deadlock detected, workflow didn't yield within {self._deadlock_timeout_seconds} second(s)"
235248
)
236249
except Exception as err:
250+
# We cannot fail a cache eviction, we must just log and not complete
251+
# the activation (failed or otherwise). This should only happen in
252+
# cases of deadlock or tasks not properly completing, and yes this
253+
# means that a slot is forever taken.
254+
# TODO(cretz): Should we build a complex mechanism to continually
255+
# try the eviction until it succeeds?
256+
if cache_remove_job:
257+
logger.exception("Failed running eviction job, not evicting")
258+
self._could_not_evict_count += 1
259+
return
260+
237261
logger.exception(
238262
"Failed handling activation on workflow with run ID %s", act.run_id
239263
)
@@ -257,7 +281,9 @@ async def _handle_activation(
257281
# Always set the run ID on the completion
258282
completion.run_id = act.run_id
259283

260-
# If there is a remove-from-cache job, do so
284+
# If there is a remove-from-cache job, do so. We don't need to log a
285+
# warning if there's not, because create workflow failing for
286+
# unregistered workflow still triggers cache remove job
261287
if cache_remove_job:
262288
if act.run_id in self._running_workflows:
263289
logger.debug(
@@ -266,16 +292,9 @@ async def _handle_activation(
266292
cache_remove_job.message,
267293
)
268294
del self._running_workflows[act.run_id]
269-
else:
270-
logger.warn(
271-
"Eviction request on unknown workflow with run ID %s, message: %s",
272-
act.run_id,
273-
cache_remove_job.message,
274-
)
275295

276-
# Encode the completion if there's a codec and it's not a
277-
# cache-remove-only activation
278-
if self._data_converter.payload_codec and not cache_remove_only_activation:
296+
# Encode the completion if there's a codec and not cache remove job
297+
if self._data_converter.payload_codec and not cache_remove_job:
279298
try:
280299
await temporalio.bridge.worker.encode_completion(
281300
completion, self._data_converter.payload_codec

0 commit comments

Comments
 (0)