Skip to content

Commit 6b43985

Browse files
Add guard for RestateContext.sideEffect await
1 parent cce4fa0 commit 6b43985

File tree

3 files changed

+160
-36
lines changed

3 files changed

+160
-36
lines changed

src/restate_context_impl.ts

Lines changed: 61 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,10 @@ export class RestateGrpcContextImpl implements RestateGrpcContext {
8282
// For example, this is illegal: 'ctx.sideEffect(() => {await ctx.get("my-state")})'
8383
static callContext = new AsyncLocalStorage<CallContext>();
8484

85+
// This is used to guard users against calling ctx.sideEffect without awaiting it.
86+
// See https://github.com/restatedev/sdk-typescript/issues/197 for more details.
87+
private executingSideEffect = false;
88+
8589
constructor(
8690
public readonly id: Buffer,
8791
public readonly serviceName: string,
@@ -90,30 +94,35 @@ export class RestateGrpcContextImpl implements RestateGrpcContext {
9094
public readonly rand: Rand = new RandImpl(id)
9195
) {}
9296

93-
public async get<T>(name: string): Promise<T | null> {
97+
// DON'T make this function async!!! see sideEffect comment for details.
98+
public get<T>(name: string): Promise<T | null> {
9499
// Check if this is a valid action
95100
this.checkState("get state");
96101

97102
// Create the message and let the state machine process it
98103
const msg = this.stateMachine.localStateStore.get(name);
99-
const result = await this.stateMachine.handleUserCodeMessage(
100-
GET_STATE_ENTRY_MESSAGE_TYPE,
101-
msg
102-
);
103104

104-
// If the GetState message did not have a value or empty,
105-
// then we went to the runtime to get the value.
106-
// When we get the response, we set it in the localStateStore,
107-
// to answer subsequent requests
108-
if (msg.value === undefined && msg.empty === undefined) {
109-
this.stateMachine.localStateStore.add(name, result as Buffer | Empty);
110-
}
105+
const getState = async (): Promise<T | null> => {
106+
const result = await this.stateMachine.handleUserCodeMessage(
107+
GET_STATE_ENTRY_MESSAGE_TYPE,
108+
msg
109+
);
111110

112-
if (!(result instanceof Buffer)) {
113-
return null;
114-
}
111+
// If the GetState message did not have a value or empty,
112+
// then we went to the runtime to get the value.
113+
// When we get the response, we set it in the localStateStore,
114+
// to answer subsequent requests
115+
if (msg.value === undefined && msg.empty === undefined) {
116+
this.stateMachine.localStateStore.add(name, result as Buffer | Empty);
117+
}
118+
119+
if (!(result instanceof Buffer)) {
120+
return null;
121+
}
115122

116-
return jsonDeserialize(result.toString());
123+
return jsonDeserialize(result.toString());
124+
};
125+
return getState();
117126
}
118127

119128
public set<T>(name: string, value: T): void {
@@ -144,7 +153,8 @@ export class RestateGrpcContextImpl implements RestateGrpcContext {
144153
}
145154
}
146155

147-
private async invoke(
156+
// DON'T make this function async!!! see sideEffect comment for details.
157+
private invoke(
148158
service: string,
149159
method: string,
150160
data: Uint8Array
@@ -156,11 +166,9 @@ export class RestateGrpcContextImpl implements RestateGrpcContext {
156166
methodName: method,
157167
parameter: Buffer.from(data),
158168
});
159-
const promise = this.stateMachine.handleUserCodeMessage(
160-
INVOKE_ENTRY_MESSAGE_TYPE,
161-
msg
162-
);
163-
return (await promise) as Uint8Array;
169+
return this.stateMachine
170+
.handleUserCodeMessage(INVOKE_ENTRY_MESSAGE_TYPE, msg)
171+
.transform((v) => v as Uint8Array);
164172
}
165173

166174
private async invokeOneWay(
@@ -184,33 +192,39 @@ export class RestateGrpcContextImpl implements RestateGrpcContext {
184192
return new Uint8Array();
185193
}
186194

187-
public async oneWayCall(
195+
// DON'T make this function async!!! see sideEffect comment for details.
196+
public oneWayCall(
188197
// eslint-disable-next-line @typescript-eslint/no-explicit-any
189198
call: () => Promise<any>
190199
): Promise<void> {
191200
this.checkState("oneWayCall");
192201

193-
await RestateGrpcContextImpl.callContext.run(
202+
return RestateGrpcContextImpl.callContext.run(
194203
{ type: CallContexType.OneWayCall },
195204
call
196205
);
197206
}
198207

199-
public async delayedCall(
208+
// DON'T make this function async!!! see sideEffect comment for details.
209+
public delayedCall(
200210
// eslint-disable-next-line @typescript-eslint/no-explicit-any
201211
call: () => Promise<any>,
202212
delayMillis?: number
203213
): Promise<void> {
204214
this.checkState("delayedCall");
205215

206216
// Delayed call is a one way call with a delay
207-
await RestateGrpcContextImpl.callContext.run(
217+
return RestateGrpcContextImpl.callContext.run(
208218
{ type: CallContexType.OneWayCall, delay: delayMillis },
209219
call
210220
);
211221
}
212222

213-
public async sideEffect<T>(
223+
// DON'T make this function async!!!
224+
// The reason is that we want the erros thrown by the initial checks to be propagated in the caller context,
225+
// and not in the promise context. To understand the semantic difference, make this function async and run the
226+
// UnawaitedSideEffectShouldFailSubsequentContextCall test.
227+
public sideEffect<T>(
214228
fn: () => Promise<T>,
215229
retryPolicy: RetrySettings = DEFAULT_INFINITE_EXPONENTIAL_BACKOFF
216230
): Promise<T> {
@@ -227,6 +241,8 @@ export class RestateGrpcContextImpl implements RestateGrpcContext {
227241
{ errorCode: ErrorCodes.INTERNAL }
228242
);
229243
}
244+
this.checkNotExecutingSideEffect();
245+
this.executingSideEffect = true;
230246

231247
const executeAndLogSideEffect = async () => {
232248
// in replay mode, we directly return the value from the log
@@ -302,7 +318,13 @@ export class RestateGrpcContextImpl implements RestateGrpcContext {
302318
};
303319

304320
const sleep = (millis: number) => this.sleep(millis);
305-
return executeWithRetries(retryPolicy, executeAndLogSideEffect, sleep);
321+
return executeWithRetries(
322+
retryPolicy,
323+
executeAndLogSideEffect,
324+
sleep
325+
).finally(() => {
326+
this.executingSideEffect = false;
327+
});
306328
}
307329

308330
public sleep(millis: number): Promise<void> {
@@ -385,9 +407,20 @@ export class RestateGrpcContextImpl implements RestateGrpcContext {
385407
return context?.delay || 0;
386408
}
387409

410+
private checkNotExecutingSideEffect() {
411+
if (this.executingSideEffect) {
412+
throw new TerminalError(
413+
`Invoked a RestateContext method while a side effect is still executing.
414+
Make sure you await the ctx.sideEffect call before using any other RestateContext method.`,
415+
{ errorCode: ErrorCodes.INTERNAL }
416+
);
417+
}
418+
}
419+
388420
private checkState(callType: string): void {
389421
const context = RestateGrpcContextImpl.callContext.getStore();
390422
if (!context) {
423+
this.checkNotExecutingSideEffect();
391424
return;
392425
}
393426

src/server/base_restate_server.ts

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -149,9 +149,7 @@ export abstract class BaseRestateServer {
149149
method
150150
);
151151
// note that this log will not print all the keys.
152-
rlog.info(
153-
`Binding: ${url} -> ${JSON.stringify(method, null, "\t")}`
154-
);
152+
rlog.info(`Binding: ${url} -> ${JSON.stringify(method, null, "\t")}`);
155153
}
156154
}
157155

@@ -264,11 +262,7 @@ export abstract class BaseRestateServer {
264262
) as HostedGrpcServiceMethod<unknown, unknown>;
265263

266264
rlog.info(
267-
`Binding: ${url} -> ${JSON.stringify(
268-
registration.method,
269-
null,
270-
"\t"
271-
)}`
265+
`Binding: ${url} -> ${JSON.stringify(registration.method, null, "\t")}`
272266
);
273267
}
274268

test/side_effect.test.ts

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -911,6 +911,103 @@ describe("AwaitSideEffectService", () => {
911911
});
912912
});
913913

914+
export class UnawaitedSideEffectShouldFailSubsequentContextCallService
915+
implements TestGreeter
916+
{
917+
constructor(
918+
// eslint-disable-next-line @typescript-eslint/no-empty-function
919+
private readonly next = (ctx: restate.RestateContext): void => {}
920+
) {}
921+
922+
// eslint-disable-next-line @typescript-eslint/no-unused-vars
923+
async greet(request: TestRequest): Promise<TestResponse> {
924+
const ctx = restate.useContext(this);
925+
926+
ctx.sideEffect<number>(async () => {
927+
// eslint-disable-next-line @typescript-eslint/no-empty-function
928+
return new Promise(() => {});
929+
});
930+
this.next(ctx);
931+
932+
throw new Error("code should not reach this point");
933+
}
934+
}
935+
936+
describe("UnawaitedSideEffectShouldFailSubsequentContextCall", () => {
937+
const defineTestCase = (
938+
contextMethodCall: string,
939+
next: (ctx: restate.RestateContext) => void
940+
): void => {
941+
it(
942+
"Not awaiting side effect should fail at next " + contextMethodCall,
943+
async () => {
944+
const result = await new TestDriver(
945+
new UnawaitedSideEffectShouldFailSubsequentContextCallService(next),
946+
[startMessage(), inputMessage(greetRequest("Till"))]
947+
).run();
948+
949+
checkTerminalError(
950+
result[0],
951+
`Invoked a RestateContext method while a side effect is still executing.
952+
Make sure you await the ctx.sideEffect call before using any other RestateContext method.`
953+
);
954+
expect(result.slice(1)).toStrictEqual([END_MESSAGE]);
955+
}
956+
);
957+
};
958+
959+
defineTestCase("side effect", (ctx) =>
960+
ctx.sideEffect<number>(async () => {
961+
return 1;
962+
})
963+
);
964+
defineTestCase("get", (ctx) => ctx.get<string>("123"));
965+
defineTestCase("set", (ctx) => ctx.set("123", "abc"));
966+
defineTestCase("call", (ctx) => {
967+
const client = new TestGreeterClientImpl(ctx);
968+
client.greet(TestRequest.create({ name: "Francesco" }));
969+
});
970+
defineTestCase("one way call", (ctx) => {
971+
const client = new TestGreeterClientImpl(ctx);
972+
ctx.oneWayCall(() =>
973+
client.greet(TestRequest.create({ name: "Francesco" }))
974+
);
975+
});
976+
});
977+
978+
export class UnawaitedSideEffectShouldFailSubsequentSetService
979+
implements TestGreeter
980+
{
981+
// eslint-disable-next-line @typescript-eslint/no-unused-vars
982+
async greet(request: TestRequest): Promise<TestResponse> {
983+
const ctx = restate.useContext(this);
984+
985+
ctx.sideEffect<number>(async () => {
986+
// eslint-disable-next-line @typescript-eslint/no-empty-function
987+
return new Promise(() => {});
988+
});
989+
ctx.set("123", "abc");
990+
991+
throw new Error("code should not reach this point");
992+
}
993+
}
994+
995+
describe("UnawaitedSideEffectShouldFailSubsequentSetService", () => {
996+
it("Not awaiting side effects should fail", async () => {
997+
const result = await new TestDriver(
998+
new UnawaitedSideEffectShouldFailSubsequentSetService(),
999+
[startMessage(), inputMessage(greetRequest("Till"))]
1000+
).run();
1001+
1002+
checkTerminalError(
1003+
result[0],
1004+
`Invoked a RestateContext method while a side effect is still executing.
1005+
Make sure you await the ctx.sideEffect call before using any other RestateContext method.`
1006+
);
1007+
expect(result.slice(1)).toStrictEqual([END_MESSAGE]);
1008+
});
1009+
});
1010+
9141011
export class TerminalErrorSideEffectService implements TestGreeter {
9151012
// eslint-disable-next-line @typescript-eslint/no-unused-vars
9161013
async greet(request: TestRequest): Promise<TestResponse> {

0 commit comments

Comments
 (0)