Skip to content

Commit c69aa32

Browse files
Use ImplicitCancellation feature from the shared core
1 parent 7ab80ff commit c69aa32

File tree

9 files changed

+1393
-1172
lines changed

9 files changed

+1393
-1172
lines changed

packages/restate-sdk-cloudflare-workers/patches/vm/sdk_shared_core_wasm_bindings.d.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ export interface WasmAwakeable {
3636

3737
export type WasmAsyncResultValue = "NotReady" | "Empty" | { Success: Uint8Array } | { Failure: WasmFailure } | { StateKeys: string[] } | { InvocationId: string };
3838

39-
export type WasmDoProgressResult = "AnyCompleted" | "ReadFromInput" | "WaitingPendingRun" | { ExecuteRun: number };
39+
export type WasmDoProgressResult = "AnyCompleted" | "ReadFromInput" | "WaitingPendingRun" | { ExecuteRun: number } | "CancelSignalReceived";
4040

4141
export interface WasmCallHandle {
4242
invocation_id_completion_id: number;

packages/restate-sdk-examples/src/greeter.ts

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -9,21 +9,12 @@
99
* https://github.com/restatedev/sdk-typescript/blob/main/LICENSE
1010
*/
1111

12-
import {
13-
service,
14-
endpoint,
15-
type Context,
16-
CombineablePromise,
17-
} from "@restatedev/restate-sdk";
18-
import { setTimeout } from "timers/promises";
12+
import { service, endpoint, type Context } from "@restatedev/restate-sdk";
1913

2014
const greeter = service({
2115
name: "greeter",
2216
handlers: {
2317
greet: async (ctx: Context, name: string) => {
24-
const p1 = ctx.sleep(110000);
25-
const p2 = ctx.run("stuff-2", async () => setTimeout(10000));
26-
await CombineablePromise.allSettled([p1, p2]);
2718
return `Hello ${name}`;
2819
},
2920
},

packages/restate-sdk/src/context_impl.ts

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -189,10 +189,9 @@ export class ContextImpl implements ObjectContext, WorkflowContext {
189189
);
190190

191191
// TODO eventually we return this promise back to the user
192-
const invocationIdPromise = this.createInvocationIdPromise(
193-
call_handles.invocation_id_completion_id
194-
);
195-
this.promisesExecutor.registerInvocationIdToCancel(invocationIdPromise);
192+
// const invocationIdPromise = this.createInvocationIdPromise(
193+
// call_handles.invocation_id_completion_id
194+
// );
196195

197196
return call_handles.call_completion_id;
198197
}, completeUsing(SuccessWithSerde(responseSerde), Failure));
@@ -208,7 +207,7 @@ export class ContextImpl implements ObjectContext, WorkflowContext {
208207
delay = BigInt(send.delay);
209208
}
210209

211-
const invocation_id_handle = vm.sys_send(
210+
vm.sys_send(
212211
send.service,
213212
send.method,
214213
parameter,
@@ -222,9 +221,8 @@ export class ContextImpl implements ObjectContext, WorkflowContext {
222221
).invocation_id_completion_id;
223222

224223
// TODO eventually we return this promise back to the user
225-
const invocationIdPromise =
226-
this.createInvocationIdPromise(invocation_id_handle);
227-
this.promisesExecutor.registerInvocationIdToCancel(invocationIdPromise);
224+
// const invocationIdPromise =
225+
// this.createInvocationIdPromise(invocation_id_handle);
228226
});
229227
}
230228

packages/restate-sdk/src/endpoint/handlers/vm/sdk_shared_core_wasm_bindings.d.ts

Lines changed: 47 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -17,34 +17,45 @@ export enum LogLevel {
1717
ERROR = 4,
1818
}
1919
export interface WasmFailure {
20-
code: number;
21-
message: string;
20+
code: number;
21+
message: string;
2222
}
2323

2424
export interface WasmExponentialRetryConfig {
25-
initial_interval: number | undefined;
26-
factor: number;
27-
max_interval: number | undefined;
28-
max_attempts: number | undefined;
29-
max_duration: number | undefined;
25+
initial_interval: number | undefined;
26+
factor: number;
27+
max_interval: number | undefined;
28+
max_attempts: number | undefined;
29+
max_duration: number | undefined;
3030
}
3131

3232
export interface WasmAwakeable {
33-
id: string;
34-
handle: number;
33+
id: string;
34+
handle: number;
3535
}
3636

37-
export type WasmAsyncResultValue = "NotReady" | "Empty" | { Success: Uint8Array } | { Failure: WasmFailure } | { StateKeys: string[] } | { InvocationId: string };
37+
export type WasmAsyncResultValue =
38+
| "NotReady"
39+
| "Empty"
40+
| { Success: Uint8Array }
41+
| { Failure: WasmFailure }
42+
| { StateKeys: string[] }
43+
| { InvocationId: string };
3844

39-
export type WasmDoProgressResult = "AnyCompleted" | "ReadFromInput" | "WaitingPendingRun" | { ExecuteRun: number };
45+
export type WasmDoProgressResult =
46+
| "AnyCompleted"
47+
| "ReadFromInput"
48+
| "WaitingPendingRun"
49+
| { ExecuteRun: number }
50+
| "CancelSignalReceived";
4051

4152
export interface WasmCallHandle {
42-
invocation_id_completion_id: number;
43-
call_completion_id: number;
53+
invocation_id_completion_id: number;
54+
call_completion_id: number;
4455
}
4556

4657
export interface WasmSendHandle {
47-
invocation_id_completion_id: number;
58+
invocation_id_completion_id: number;
4859
}
4960

5061
export class WasmHeader {
@@ -91,8 +102,21 @@ export class WasmVM {
91102
sys_clear_state(key: string): void;
92103
sys_clear_all_state(): void;
93104
sys_sleep(millis: bigint): number;
94-
sys_call(service: string, handler: string, buffer: Uint8Array, key: string | null | undefined, headers: WasmHeader[]): WasmCallHandle;
95-
sys_send(service: string, handler: string, buffer: Uint8Array, key: string | null | undefined, headers: WasmHeader[], delay?: bigint | null): WasmSendHandle;
105+
sys_call(
106+
service: string,
107+
handler: string,
108+
buffer: Uint8Array,
109+
key: string | null | undefined,
110+
headers: WasmHeader[]
111+
): WasmCallHandle;
112+
sys_send(
113+
service: string,
114+
handler: string,
115+
buffer: Uint8Array,
116+
key: string | null | undefined,
117+
headers: WasmHeader[],
118+
delay?: bigint | null
119+
): WasmSendHandle;
96120
sys_awakeable(): WasmAwakeable;
97121
sys_complete_awakeable_success(id: string, buffer: Uint8Array): void;
98122
sys_complete_awakeable_failure(id: string, value: WasmFailure): void;
@@ -103,11 +127,16 @@ export class WasmVM {
103127
sys_run(name: string): number;
104128
propose_run_completion_success(handle: number, buffer: Uint8Array): void;
105129
propose_run_completion_failure(handle: number, value: WasmFailure): void;
106-
propose_run_completion_failure_transient(handle: number, error_message: string, error_stacktrace: string | null | undefined, attempt_duration: bigint, config: WasmExponentialRetryConfig): void;
130+
propose_run_completion_failure_transient(
131+
handle: number,
132+
error_message: string,
133+
error_stacktrace: string | null | undefined,
134+
attempt_duration: bigint,
135+
config: WasmExponentialRetryConfig
136+
): void;
107137
sys_cancel_invocation(target_invocation_id: string): void;
108138
sys_write_output_success(buffer: Uint8Array): void;
109139
sys_write_output_failure(value: WasmFailure): void;
110140
sys_end(): void;
111141
is_processing(): boolean;
112142
}
113-

packages/restate-sdk/src/endpoint/handlers/vm/sdk_shared_core_wasm_bindings.js

Lines changed: 1323 additions & 1079 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

packages/restate-sdk/src/promises.ts

Lines changed: 10 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
/* eslint-disable @typescript-eslint/no-explicit-any */
1313

1414
import type { CombineablePromise } from "./context.js";
15-
import * as vm from "./endpoint/handlers/vm/sdk_shared_core_wasm_bindings.js";
1615
import { CancelledError, TimeoutError } from "./types/errors.js";
1716
import { CompletablePromise } from "./utils/completable_promise.js";
1817
import type { ContextImpl, RunClosuresTracker } from "./context_impl.js";
@@ -80,7 +79,7 @@ abstract class AbstractRestatePromise<T> implements RestatePromise<T> {
8079
this.pollingPromise =
8180
this.pollingPromise ||
8281
this[RESTATE_CTX_SYMBOL].promisesExecutor
83-
.doProgress(true, this)
82+
.doProgress(this)
8483
.catch(() => {});
8584
return this.publicPromiseOrCancelPromise().then(onfulfilled, onrejected);
8685
}
@@ -94,7 +93,7 @@ abstract class AbstractRestatePromise<T> implements RestatePromise<T> {
9493
this.pollingPromise =
9594
this.pollingPromise ||
9695
this[RESTATE_CTX_SYMBOL].promisesExecutor
97-
.doProgress(true, this)
96+
.doProgress(this)
9897
.catch(() => {});
9998
return this.publicPromiseOrCancelPromise().catch(onrejected);
10099
}
@@ -103,7 +102,7 @@ abstract class AbstractRestatePromise<T> implements RestatePromise<T> {
103102
this.pollingPromise =
104103
this.pollingPromise ||
105104
this[RESTATE_CTX_SYMBOL].promisesExecutor
106-
.doProgress(true, this)
105+
.doProgress(this)
107106
.catch(() => {});
108107
return this.publicPromiseOrCancelPromise().finally(onfinally);
109108
}
@@ -282,9 +281,6 @@ export class RestatePendingPromise<T> implements RestatePromise<T> {
282281
* Promises executor, gluing VM with I/O and Promises given to user space.
283282
*/
284283
export class PromisesExecutor {
285-
private readonly invocationIdsToCancel: Array<RestateSinglePromise<string>> =
286-
[];
287-
288284
constructor(
289285
private readonly coreVm: vm.WasmVM,
290286
private readonly inputPump: InputPump,
@@ -293,52 +289,13 @@ export class PromisesExecutor {
293289
private readonly errorCallback: (e: any) => void
294290
) {}
295291

296-
async doProgress(
297-
enableImplicitCancellation: boolean,
298-
restatePromise: RestatePromise<unknown>
299-
) {
292+
async doProgress(restatePromise: RestatePromise<unknown>) {
300293
// Only the first time try process output
301294
await this.outputPump.awaitNextProgress();
302-
await this.doProgressInner(enableImplicitCancellation, restatePromise);
303-
}
304-
305-
registerInvocationIdToCancel(p: RestateSinglePromise<string>) {
306-
this.invocationIdsToCancel.push(p);
295+
await this.doProgressInner(restatePromise);
307296
}
308297

309-
private async doProgressInner(
310-
enableImplicitCancellation: boolean,
311-
restatePromise: RestatePromise<unknown>
312-
) {
313-
if (enableImplicitCancellation) {
314-
const cancellation_notification = this.coreVm.take_notification(
315-
vm.cancel_handle()
316-
);
317-
if (cancellation_notification !== "NotReady") {
318-
// Cancel child invocations!
319-
try {
320-
const invocationIds: string[] = [];
321-
// Await all the invocation id promises
322-
for (const invocationId of this.invocationIdsToCancel) {
323-
// This will not cancel, because I flipped the cancellation by taking it with take_notification
324-
invocationIds.push(await invocationId);
325-
}
326-
// Now cancel all of them boom!
327-
for (const invocationId of invocationIds) {
328-
this.coreVm.sys_cancel_invocation(invocationId);
329-
}
330-
} catch (e) {
331-
// Not good
332-
this.errorCallback(e);
333-
return;
334-
}
335-
336-
// Now flag this promise as cancelled and boom!
337-
restatePromise.tryCancel();
338-
return;
339-
}
340-
}
341-
298+
private async doProgressInner(restatePromise: RestatePromise<unknown>) {
342299
// Try complete the promise
343300
restatePromise.tryComplete();
344301

@@ -356,10 +313,6 @@ export class PromisesExecutor {
356313
// Completed, we're good!
357314
return;
358315
}
359-
if (enableImplicitCancellation) {
360-
// We want to be woken up on the cancellation marker too!
361-
handles.push(vm.cancel_handle());
362-
}
363316
const doProgressResult = this.coreVm.do_progress(
364317
new Uint32Array(handles)
365318
);
@@ -372,6 +325,9 @@ export class PromisesExecutor {
372325
} else if (doProgressResult === "WaitingPendingRun") {
373326
// Wait for any of the pending run to complete
374327
await this.runClosuresTracker.awaitNextCompletedRun();
328+
} else if (doProgressResult === "CancelSignalReceived") {
329+
restatePromise.tryCancel();
330+
return;
375331
} else {
376332
// We need to execute a run closure
377333
this.runClosuresTracker.executeRun(doProgressResult.ExecuteRun);
@@ -380,7 +336,7 @@ export class PromisesExecutor {
380336
}
381337

382338
// Recursion
383-
await this.doProgress(enableImplicitCancellation, restatePromise);
339+
await this.doProgress(restatePromise);
384340
} catch (e) {
385341
// Not good, this is a retryable error.
386342
this.errorCallback(e);

sdk-shared-core-wasm-bindings/Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

sdk-shared-core-wasm-bindings/src/lib.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -320,6 +320,8 @@ pub enum WasmDoProgressResult {
320320
WaitingPendingRun,
321321
/// The SDK should execute a pending run
322322
ExecuteRun(#[tsify(type = "number")] WasmNotificationHandle),
323+
/// Got cancel signal
324+
CancelSignalReceived,
323325
}
324326

325327
impl From<DoProgressResponse> for WasmDoProgressResult {
@@ -329,6 +331,7 @@ impl From<DoProgressResponse> for WasmDoProgressResult {
329331
DoProgressResponse::ReadFromInput => WasmDoProgressResult::ReadFromInput,
330332
DoProgressResponse::WaitingPendingRun => WasmDoProgressResult::WaitingPendingRun,
331333
DoProgressResponse::ExecuteRun(n) => WasmDoProgressResult::ExecuteRun(n.into()),
334+
DoProgressResponse::CancelSignalReceived => WasmDoProgressResult::CancelSignalReceived
332335
}
333336
}
334337
}
@@ -392,7 +395,7 @@ impl WasmVM {
392395
let log_dispatcher = Dispatch::new(log_subscriber(log_level, Some(logger_id)));
393396

394397
let vm = tracing::dispatcher::with_default(&log_dispatcher, || {
395-
CoreVM::new(WasmHeaderList::from(headers), VMOptions {})
398+
CoreVM::new(WasmHeaderList::from(headers), VMOptions::default())
396399
})?;
397400

398401
Ok(Self { vm, log_dispatcher })

0 commit comments

Comments
 (0)