Skip to content

Commit 6a08bc3

Browse files
committed
Add missing sharedcore features
- add call/send with idempotency key - add cancel with invocation id - add call/send to return the invocation id - add ctx.attach
1 parent 1f40f4f commit 6a08bc3

File tree

15 files changed

+339
-104
lines changed

15 files changed

+339
-104
lines changed
Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,11 @@
11
exclusions:
22
"alwaysSuspending":
3-
- "dev.restate.sdktesting.tests.Cancellation.cancelFromContext"
43
- "dev.restate.sdktesting.tests.Combinators.awakeableOrTimeoutUsingAwaitAny"
5-
- "dev.restate.sdktesting.tests.ServiceToServiceCommunication.callWithIdempotencyKey"
6-
- "dev.restate.sdktesting.tests.ServiceToServiceCommunication.oneWayCallWithIdempotencyKey"
74
"default":
8-
- "dev.restate.sdktesting.tests.Cancellation.cancelFromContext"
95
- "dev.restate.sdktesting.tests.Combinators.awakeableOrTimeoutUsingAwaitAny"
10-
- "dev.restate.sdktesting.tests.ServiceToServiceCommunication.callWithIdempotencyKey"
11-
- "dev.restate.sdktesting.tests.ServiceToServiceCommunication.oneWayCallWithIdempotencyKey"
126
"singleThreadSinglePartition":
13-
- "dev.restate.sdktesting.tests.Cancellation.cancelFromContext"
147
- "dev.restate.sdktesting.tests.Combinators.awakeableOrTimeoutUsingAwaitAny"
15-
- "dev.restate.sdktesting.tests.ServiceToServiceCommunication.callWithIdempotencyKey"
16-
- "dev.restate.sdktesting.tests.ServiceToServiceCommunication.oneWayCallWithIdempotencyKey"
178
"threeNodes":
18-
- "dev.restate.sdktesting.tests.Cancellation.cancelFromContext"
199
- "dev.restate.sdktesting.tests.Combinators.awakeableOrTimeoutUsingAwaitAny"
20-
- "dev.restate.sdktesting.tests.ServiceToServiceCommunication.callWithIdempotencyKey"
21-
- "dev.restate.sdktesting.tests.ServiceToServiceCommunication.oneWayCallWithIdempotencyKey"
2210
"threeNodesAlwaysSuspending":
23-
- "dev.restate.sdktesting.tests.Cancellation.cancelFromContext"
2411
- "dev.restate.sdktesting.tests.Combinators.awakeableOrTimeoutUsingAwaitAny"
25-
- "dev.restate.sdktesting.tests.ServiceToServiceCommunication.callWithIdempotencyKey"
26-
- "dev.restate.sdktesting.tests.ServiceToServiceCommunication.oneWayCallWithIdempotencyKey"

packages/restate-e2e-services/src/proxy.ts

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -32,35 +32,28 @@ function rawCall(
3232
ctx: restate.Context,
3333
request: ProxyRequest
3434
): Promise<Uint8Array> {
35-
// TODO add idempotency key support here
36-
if (request.idempotencyKey != undefined) {
37-
throw new TerminalError("idempotency key not supported yet");
38-
}
3935
return ctx.genericCall({
4036
service: request.serviceName,
4137
method: request.handlerName,
4238
key: request.virtualObjectKey,
4339
inputSerde: restate.serde.binary,
4440
outputSerde: restate.serde.binary,
4541
parameter: new Uint8Array(request.message),
42+
idempotencyKey: request.idempotencyKey,
4643
});
4744
}
4845

4946
function rawSend(ctx: restate.Context, request: ProxyRequest): Promise<string> {
50-
// TODO add idempotency key support here
51-
if (request.idempotencyKey != undefined) {
52-
throw new TerminalError("idempotency key not supported yet");
53-
}
54-
ctx.genericSend({
47+
const handle = ctx.genericSend({
5548
service: request.serviceName,
5649
method: request.handlerName,
5750
key: request.virtualObjectKey,
5851
inputSerde: restate.serde.binary,
5952
parameter: new Uint8Array(request.message),
6053
delay: request.delayMillis,
54+
idempotencyKey: request.idempotencyKey,
6155
});
62-
// TODO this should return the invocation id
63-
return Promise.resolve("unknown");
56+
return handle.invocationId;
6457
}
6558

6659
const o = restate.service({

packages/restate-e2e-services/src/test_utils.ts

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -66,13 +66,11 @@ const o = restate.service({
6666
},
6767

6868
async cancelInvocation(
69-
// eslint-disable-next-line @typescript-eslint/no-unused-vars
7069
ctx: restate.Context,
71-
// eslint-disable-next-line @typescript-eslint/no-unused-vars
7270
invocationId: string
7371
): Promise<void> {
74-
// TODO add cancel invocation here!
75-
return Promise.reject(new TerminalError("cancel is not supported yet"));
72+
const id = restate.InvocationIdParser.fromString(invocationId);
73+
ctx.cancel(id);
7674
},
7775
},
7876
});

packages/restate-sdk-cloudflare-workers/patches/vm/sdk_shared_core_wasm_bindings.d.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,8 +91,10 @@ export class WasmVM {
9191
sys_clear_state(key: string): void;
9292
sys_clear_all_state(): void;
9393
sys_sleep(millis: bigint): number;
94-
sys_call(service: string, handler: string, buffer: Uint8Array, key: string | null | undefined, headers: WasmHeader[]): WasmCallHandle;
95-
sys_send(service: string, handler: string, buffer: Uint8Array, key: string | null | undefined, headers: WasmHeader[], delay?: bigint | null): WasmSendHandle;
94+
sys_attach_invocation(invocation_id: string): number;
95+
sys_get_invocation_output(invocation_id: string): number;
96+
sys_call(service: string, handler: string, buffer: Uint8Array, key: string | null | undefined, headers: WasmHeader[], idempotency_key?: string | null): WasmCallHandle;
97+
sys_send(service: string, handler: string, buffer: Uint8Array, key: string | null | undefined, headers: WasmHeader[], delay?: bigint | null, idempotency_key?: string | null): WasmSendHandle;
9698
sys_awakeable(): WasmAwakeable;
9799
sys_complete_awakeable_success(id: string, buffer: Uint8Array): void;
98100
sys_complete_awakeable_failure(id: string, value: WasmFailure): void;

packages/restate-sdk-cloudflare-workers/patches/vm/sdk_shared_core_wasm_bindings_bg.js

Lines changed: 36 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -691,15 +691,42 @@ export class WasmVM {
691691
}
692692
return ret[0] >>> 0;
693693
}
694+
/**
695+
* @param {string} invocation_id
696+
* @returns {number}
697+
*/
698+
sys_attach_invocation(invocation_id) {
699+
const ptr0 = passStringToWasm0(invocation_id, wasm.__wbindgen_malloc, wasm.__wbindgen_realloc);
700+
const len0 = WASM_VECTOR_LEN;
701+
const ret = wasm.wasmvm_sys_attach_invocation(this.__wbg_ptr, ptr0, len0);
702+
if (ret[2]) {
703+
throw takeFromExternrefTable0(ret[1]);
704+
}
705+
return ret[0] >>> 0;
706+
}
707+
/**
708+
* @param {string} invocation_id
709+
* @returns {number}
710+
*/
711+
sys_get_invocation_output(invocation_id) {
712+
const ptr0 = passStringToWasm0(invocation_id, wasm.__wbindgen_malloc, wasm.__wbindgen_realloc);
713+
const len0 = WASM_VECTOR_LEN;
714+
const ret = wasm.wasmvm_sys_get_invocation_output(this.__wbg_ptr, ptr0, len0);
715+
if (ret[2]) {
716+
throw takeFromExternrefTable0(ret[1]);
717+
}
718+
return ret[0] >>> 0;
719+
}
694720
/**
695721
* @param {string} service
696722
* @param {string} handler
697723
* @param {Uint8Array} buffer
698724
* @param {string | null | undefined} key
699725
* @param {WasmHeader[]} headers
726+
* @param {string | null} [idempotency_key]
700727
* @returns {WasmCallHandle}
701728
*/
702-
sys_call(service, handler, buffer, key, headers) {
729+
sys_call(service, handler, buffer, key, headers, idempotency_key) {
703730
const ptr0 = passStringToWasm0(service, wasm.__wbindgen_malloc, wasm.__wbindgen_realloc);
704731
const len0 = WASM_VECTOR_LEN;
705732
const ptr1 = passStringToWasm0(handler, wasm.__wbindgen_malloc, wasm.__wbindgen_realloc);
@@ -710,7 +737,9 @@ export class WasmVM {
710737
var len3 = WASM_VECTOR_LEN;
711738
const ptr4 = passArrayJsValueToWasm0(headers, wasm.__wbindgen_malloc);
712739
const len4 = WASM_VECTOR_LEN;
713-
const ret = wasm.wasmvm_sys_call(this.__wbg_ptr, ptr0, len0, ptr1, len1, ptr2, len2, ptr3, len3, ptr4, len4);
740+
var ptr5 = isLikeNone(idempotency_key) ? 0 : passStringToWasm0(idempotency_key, wasm.__wbindgen_malloc, wasm.__wbindgen_realloc);
741+
var len5 = WASM_VECTOR_LEN;
742+
const ret = wasm.wasmvm_sys_call(this.__wbg_ptr, ptr0, len0, ptr1, len1, ptr2, len2, ptr3, len3, ptr4, len4, ptr5, len5);
714743
if (ret[2]) {
715744
throw takeFromExternrefTable0(ret[1]);
716745
}
@@ -723,9 +752,10 @@ export class WasmVM {
723752
* @param {string | null | undefined} key
724753
* @param {WasmHeader[]} headers
725754
* @param {bigint | null} [delay]
755+
* @param {string | null} [idempotency_key]
726756
* @returns {WasmSendHandle}
727757
*/
728-
sys_send(service, handler, buffer, key, headers, delay) {
758+
sys_send(service, handler, buffer, key, headers, delay, idempotency_key) {
729759
const ptr0 = passStringToWasm0(service, wasm.__wbindgen_malloc, wasm.__wbindgen_realloc);
730760
const len0 = WASM_VECTOR_LEN;
731761
const ptr1 = passStringToWasm0(handler, wasm.__wbindgen_malloc, wasm.__wbindgen_realloc);
@@ -736,7 +766,9 @@ export class WasmVM {
736766
var len3 = WASM_VECTOR_LEN;
737767
const ptr4 = passArrayJsValueToWasm0(headers, wasm.__wbindgen_malloc);
738768
const len4 = WASM_VECTOR_LEN;
739-
const ret = wasm.wasmvm_sys_send(this.__wbg_ptr, ptr0, len0, ptr1, len1, ptr2, len2, ptr3, len3, ptr4, len4, !isLikeNone(delay), isLikeNone(delay) ? BigInt(0) : delay);
769+
var ptr5 = isLikeNone(idempotency_key) ? 0 : passStringToWasm0(idempotency_key, wasm.__wbindgen_malloc, wasm.__wbindgen_realloc);
770+
var len5 = WASM_VECTOR_LEN;
771+
const ret = wasm.wasmvm_sys_send(this.__wbg_ptr, ptr0, len0, ptr1, len1, ptr2, len2, ptr3, len3, ptr4, len4, !isLikeNone(delay), isLikeNone(delay) ? BigInt(0) : delay, ptr5, len5);
740772
if (ret[2]) {
741773
throw takeFromExternrefTable0(ret[1]);
742774
}

packages/restate-sdk-cloudflare-workers/patches/vm/sdk_shared_core_wasm_bindings_bg.wasm.d.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,10 @@ export const wasmvm_sys_set_state: (a: number, b: number, c: number, d: number,
3131
export const wasmvm_sys_clear_state: (a: number, b: number, c: number) => [number, number];
3232
export const wasmvm_sys_clear_all_state: (a: number) => [number, number];
3333
export const wasmvm_sys_sleep: (a: number, b: bigint) => [number, number, number];
34-
export const wasmvm_sys_call: (a: number, b: number, c: number, d: number, e: number, f: number, g: number, h: number, i: number, j: number, k: number) => [number, number, number];
35-
export const wasmvm_sys_send: (a: number, b: number, c: number, d: number, e: number, f: number, g: number, h: number, i: number, j: number, k: number, l: number, m: bigint) => [number, number, number];
34+
export const wasmvm_sys_attach_invocation: (a: number, b: number, c: number) => [number, number, number];
35+
export const wasmvm_sys_get_invocation_output: (a: number, b: number, c: number) => [number, number, number];
36+
export const wasmvm_sys_call: (a: number, b: number, c: number, d: number, e: number, f: number, g: number, h: number, i: number, j: number, k: number, l: number, m: number) => [number, number, number];
37+
export const wasmvm_sys_send: (a: number, b: number, c: number, d: number, e: number, f: number, g: number, h: number, i: number, j: number, k: number, l: number, m: bigint, n: number, o: number) => [number, number, number];
3638
export const wasmvm_sys_awakeable: (a: number) => [number, number, number];
3739
export const wasmvm_sys_complete_awakeable_success: (a: number, b: number, c: number, d: number, e: number) => [number, number];
3840
export const wasmvm_sys_complete_awakeable_failure: (a: number, b: number, c: number, d: any) => [number, number];

packages/restate-sdk/src/common_api.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,11 @@ export type {
1818
Rand,
1919
GenericCall,
2020
GenericSend,
21+
InvocationId,
22+
InvocationHandle,
23+
InvocationPromise,
2124
} from "./context.js";
22-
export { CombineablePromise } from "./context.js";
25+
export { InvocationIdParser, CombineablePromise } from "./context.js";
2326

2427
export type { Serde } from "@restatedev/restate-sdk-core";
2528
export { serde } from "@restatedev/restate-sdk-core";

packages/restate-sdk/src/context.ts

Lines changed: 50 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,7 @@ export type GenericCall<REQ, RES> = {
231231
headers?: Record<string, string>;
232232
inputSerde?: Serde<REQ>;
233233
outputSerde?: Serde<RES>;
234+
idempotencyKey?: string;
234235
};
235236

236237
/**
@@ -246,6 +247,7 @@ export type GenericSend<REQ> = {
246247
headers?: Record<string, string>;
247248
inputSerde?: Serde<REQ>;
248249
delay?: number;
250+
idempotencyKey?: string;
249251
};
250252

251253
/**
@@ -537,15 +539,33 @@ export interface Context extends RestateContext {
537539

538540
genericCall<REQ = Uint8Array, RES = Uint8Array>(
539541
call: GenericCall<REQ, RES>
540-
): Promise<RES>;
542+
): InvocationPromise<RES>;
541543

542-
genericSend<REQ = Uint8Array>(call: GenericSend<REQ>): void;
544+
genericSend<REQ = Uint8Array>(call: GenericSend<REQ>): InvocationHandle;
543545

544546
/**
545547
* Returns the raw request that triggered that handler.
546548
* Use that object to inspect the original request headers
547549
*/
548550
request(): Request;
551+
552+
/**
553+
* Cancel an invocation
554+
*
555+
* @param invocationId the invocation id to cancel
556+
*/
557+
cancel(invocationId: InvocationId): void;
558+
559+
/**
560+
* Attach to an invocation
561+
*
562+
* @param invocationId the invocation id to attach to
563+
* @param serde the serde to use for the result, default to JSON serde.
564+
*/
565+
attach<T>(
566+
invocationId: InvocationId,
567+
serde?: Serde<T>
568+
): CombineablePromise<T>;
549569
}
550570

551571
/**
@@ -633,6 +653,34 @@ export type CombineablePromise<T> = Promise<T> & {
633653
orTimeout(millis: number): CombineablePromise<T>;
634654
};
635655

656+
/**
657+
* Represents an invocation id.
658+
* @see {@link InvocationIdParser}
659+
*/
660+
export type InvocationId = string & { __brand: "InvocationId" };
661+
662+
export const InvocationIdParser = {
663+
/**
664+
* Creates an invocation id from a string.
665+
* @param id the string to use as invocation id.
666+
*/
667+
fromString(id: string): InvocationId {
668+
if (!id.startsWith("inv")) {
669+
throw new Error(
670+
`Expected invocation id to start with 'inv' but got ${id}`
671+
);
672+
}
673+
return id as InvocationId;
674+
},
675+
};
676+
677+
export type InvocationHandle = {
678+
// The invocation id of the call
679+
readonly invocationId: Promise<InvocationId>;
680+
};
681+
682+
export type InvocationPromise<T> = CombineablePromise<T> & InvocationHandle;
683+
636684
export const CombineablePromise = {
637685
/**
638686
* Creates a Promise that is resolved with an array of results when all of the provided Promises

0 commit comments

Comments
 (0)