This repository was archived by the owner on Jun 10, 2025. It is now read-only.
File tree Expand file tree Collapse file tree 2 files changed +10
-0
lines changed
src/saturn_engine/worker/executors Expand file tree Collapse file tree 2 files changed +10
-0
lines changed Original file line number Diff line number Diff line change @@ -49,6 +49,7 @@ def __init__(
49
49
self .output = output
50
50
self .resources : dict [str , ResourceContext ] = {}
51
51
self .queue = queue
52
+ self .is_cancelled = False
52
53
53
54
@property
54
55
def id (self ) -> str :
@@ -93,6 +94,9 @@ def saturn_context(self) -> t.Iterator[None]:
93
94
with job_context (self .queue .definition ), message_context (self .message .message ):
94
95
yield
95
96
97
+ def cancel (self ) -> None :
98
+ self .is_cancelled = True
99
+
96
100
97
101
class ExecutableQueue :
98
102
def __init__ (
Original file line number Diff line number Diff line change 22
22
from .executable import ExecutableMessage
23
23
24
24
25
+ class MessageCancelled (Exception ):
26
+ pass
27
+
28
+
25
29
class ExecutorQueue :
26
30
CLOSE_TIMEOUT = datetime .timedelta (seconds = 60 )
27
31
@@ -68,6 +72,8 @@ async def scope(
68
72
xmsg : ExecutableMessage ,
69
73
) -> PipelineResults :
70
74
try :
75
+ if xmsg .is_cancelled :
76
+ raise MessageCancelled
71
77
return await self .executor .process_message (xmsg )
72
78
except Exception :
73
79
exc_type , exc_value , exc_traceback = sys .exc_info ()
You can’t perform that action at this time.
0 commit comments