@@ -82,6 +82,10 @@ export class RestateGrpcContextImpl implements RestateGrpcContext {
82
82
// For example, this is illegal: 'ctx.sideEffect(() => {await ctx.get("my-state")})'
83
83
static callContext = new AsyncLocalStorage < CallContext > ( ) ;
84
84
85
+ // This is used to guard users against calling ctx.sideEffect without awaiting it.
86
+ // See https://github.com/restatedev/sdk-typescript/issues/197 for more details.
87
+ private executingSideEffect = false ;
88
+
85
89
constructor (
86
90
public readonly id : Buffer ,
87
91
public readonly serviceName : string ,
@@ -90,30 +94,35 @@ export class RestateGrpcContextImpl implements RestateGrpcContext {
90
94
public readonly rand : Rand = new RandImpl ( id )
91
95
) { }
92
96
93
- public async get < T > ( name : string ) : Promise < T | null > {
97
+ // DON'T make this function async!!! see sideEffect comment for details.
98
+ public get < T > ( name : string ) : Promise < T | null > {
94
99
// Check if this is a valid action
95
100
this . checkState ( "get state" ) ;
96
101
97
102
// Create the message and let the state machine process it
98
103
const msg = this . stateMachine . localStateStore . get ( name ) ;
99
- const result = await this . stateMachine . handleUserCodeMessage (
100
- GET_STATE_ENTRY_MESSAGE_TYPE ,
101
- msg
102
- ) ;
103
104
104
- // If the GetState message did not have a value or empty,
105
- // then we went to the runtime to get the value.
106
- // When we get the response, we set it in the localStateStore,
107
- // to answer subsequent requests
108
- if ( msg . value === undefined && msg . empty === undefined ) {
109
- this . stateMachine . localStateStore . add ( name , result as Buffer | Empty ) ;
110
- }
105
+ const getState = async ( ) : Promise < T | null > => {
106
+ const result = await this . stateMachine . handleUserCodeMessage (
107
+ GET_STATE_ENTRY_MESSAGE_TYPE ,
108
+ msg
109
+ ) ;
111
110
112
- if ( ! ( result instanceof Buffer ) ) {
113
- return null ;
114
- }
111
+ // If the GetState message did not have a value or empty,
112
+ // then we went to the runtime to get the value.
113
+ // When we get the response, we set it in the localStateStore,
114
+ // to answer subsequent requests
115
+ if ( msg . value === undefined && msg . empty === undefined ) {
116
+ this . stateMachine . localStateStore . add ( name , result as Buffer | Empty ) ;
117
+ }
115
118
116
- return jsonDeserialize ( result . toString ( ) ) ;
119
+ if ( ! ( result instanceof Buffer ) ) {
120
+ return null ;
121
+ }
122
+
123
+ return jsonDeserialize ( result . toString ( ) ) ;
124
+ } ;
125
+ return getState ( ) ;
117
126
}
118
127
119
128
public set < T > ( name : string , value : T ) : void {
@@ -144,7 +153,8 @@ export class RestateGrpcContextImpl implements RestateGrpcContext {
144
153
}
145
154
}
146
155
147
- private async invoke (
156
+ // DON'T make this function async!!! see sideEffect comment for details.
157
+ private invoke (
148
158
service : string ,
149
159
method : string ,
150
160
data : Uint8Array
@@ -156,11 +166,9 @@ export class RestateGrpcContextImpl implements RestateGrpcContext {
156
166
methodName : method ,
157
167
parameter : Buffer . from ( data ) ,
158
168
} ) ;
159
- const promise = this . stateMachine . handleUserCodeMessage (
160
- INVOKE_ENTRY_MESSAGE_TYPE ,
161
- msg
162
- ) ;
163
- return ( await promise ) as Uint8Array ;
169
+ return this . stateMachine
170
+ . handleUserCodeMessage ( INVOKE_ENTRY_MESSAGE_TYPE , msg )
171
+ . transform ( ( v ) => v as Uint8Array ) ;
164
172
}
165
173
166
174
private async invokeOneWay (
@@ -184,33 +192,39 @@ export class RestateGrpcContextImpl implements RestateGrpcContext {
184
192
return new Uint8Array ( ) ;
185
193
}
186
194
187
- public async oneWayCall (
195
+ // DON'T make this function async!!! see sideEffect comment for details.
196
+ public oneWayCall (
188
197
// eslint-disable-next-line @typescript-eslint/no-explicit-any
189
198
call : ( ) => Promise < any >
190
199
) : Promise < void > {
191
200
this . checkState ( "oneWayCall" ) ;
192
201
193
- await RestateGrpcContextImpl . callContext . run (
202
+ return RestateGrpcContextImpl . callContext . run (
194
203
{ type : CallContexType . OneWayCall } ,
195
204
call
196
205
) ;
197
206
}
198
207
199
- public async delayedCall (
208
+ // DON'T make this function async!!! see sideEffect comment for details.
209
+ public delayedCall (
200
210
// eslint-disable-next-line @typescript-eslint/no-explicit-any
201
211
call : ( ) => Promise < any > ,
202
212
delayMillis ?: number
203
213
) : Promise < void > {
204
214
this . checkState ( "delayedCall" ) ;
205
215
206
216
// Delayed call is a one way call with a delay
207
- await RestateGrpcContextImpl . callContext . run (
217
+ return RestateGrpcContextImpl . callContext . run (
208
218
{ type : CallContexType . OneWayCall , delay : delayMillis } ,
209
219
call
210
220
) ;
211
221
}
212
222
213
- public async sideEffect < T > (
223
+ // DON'T make this function async!!!
224
+ // The reason is that we want the erros thrown by the initial checks to be propagated in the caller context,
225
+ // and not in the promise context. To understand the semantic difference, make this function async and run the
226
+ // UnawaitedSideEffectShouldFailSubsequentContextCall test.
227
+ public sideEffect < T > (
214
228
fn : ( ) => Promise < T > ,
215
229
retryPolicy : RetrySettings = DEFAULT_INFINITE_EXPONENTIAL_BACKOFF
216
230
) : Promise < T > {
@@ -227,6 +241,8 @@ export class RestateGrpcContextImpl implements RestateGrpcContext {
227
241
{ errorCode : ErrorCodes . INTERNAL }
228
242
) ;
229
243
}
244
+ this . checkNotExecutingSideEffect ( ) ;
245
+ this . executingSideEffect = true ;
230
246
231
247
const executeAndLogSideEffect = async ( ) => {
232
248
// in replay mode, we directly return the value from the log
@@ -301,17 +317,25 @@ export class RestateGrpcContextImpl implements RestateGrpcContext {
301
317
return sideEffectResult ;
302
318
} ;
303
319
304
- const sleep = ( millis : number ) => this . sleep ( millis ) ;
305
- return executeWithRetries ( retryPolicy , executeAndLogSideEffect , sleep ) ;
320
+ const sleep = ( millis : number ) => this . sleepInternal ( millis ) ;
321
+ return executeWithRetries (
322
+ retryPolicy ,
323
+ executeAndLogSideEffect ,
324
+ sleep
325
+ ) . finally ( ( ) => {
326
+ this . executingSideEffect = false ;
327
+ } ) ;
306
328
}
307
329
308
330
public sleep ( millis : number ) : Promise < void > {
309
331
this . checkState ( "sleep" ) ;
332
+ return this . sleepInternal ( millis ) ;
333
+ }
310
334
311
- const msg = SleepEntryMessage . create ( { wakeUpTime : Date . now ( ) + millis } ) ;
335
+ private sleepInternal ( millis : number ) : Promise < void > {
312
336
return this . stateMachine . handleUserCodeMessage < void > (
313
337
SLEEP_ENTRY_MESSAGE_TYPE ,
314
- msg
338
+ SleepEntryMessage . create ( { wakeUpTime : Date . now ( ) + millis } )
315
339
) ;
316
340
}
317
341
@@ -385,9 +409,20 @@ export class RestateGrpcContextImpl implements RestateGrpcContext {
385
409
return context ?. delay || 0 ;
386
410
}
387
411
412
+ private checkNotExecutingSideEffect ( ) {
413
+ if ( this . executingSideEffect ) {
414
+ throw new TerminalError (
415
+ `Invoked a RestateContext method while a side effect is still executing.
416
+ Make sure you await the ctx.sideEffect call before using any other RestateContext method.` ,
417
+ { errorCode : ErrorCodes . INTERNAL }
418
+ ) ;
419
+ }
420
+ }
421
+
388
422
private checkState ( callType : string ) : void {
389
423
const context = RestateGrpcContextImpl . callContext . getStore ( ) ;
390
424
if ( ! context ) {
425
+ this . checkNotExecutingSideEffect ( ) ;
391
426
return ;
392
427
}
393
428
0 commit comments