Skip to content

Commit ca12811

Browse files
Add CombineablePromise.map (#506)
1 parent ac215af commit ca12811

File tree

4 files changed

+89
-19
lines changed

4 files changed

+89
-19
lines changed
Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1 @@
1-
exclusions:
2-
"alwaysSuspending":
3-
- "dev.restate.sdktesting.tests.Combinators.awakeableOrTimeoutUsingAwaitAny"
4-
"default":
5-
- "dev.restate.sdktesting.tests.Combinators.awakeableOrTimeoutUsingAwaitAny"
6-
"singleThreadSinglePartition":
7-
- "dev.restate.sdktesting.tests.Combinators.awakeableOrTimeoutUsingAwaitAny"
8-
"threeNodes":
9-
- "dev.restate.sdktesting.tests.Combinators.awakeableOrTimeoutUsingAwaitAny"
10-
"threeNodesAlwaysSuspending":
11-
- "dev.restate.sdktesting.tests.Combinators.awakeableOrTimeoutUsingAwaitAny"
1+
exclusions: {}

packages/restate-e2e-services/src/virtual_object_command_interpreter.ts

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -96,13 +96,7 @@ function parseAwaitableCommand(
9696
case "createAwakeable":
9797
return createAwakeable(ctx, command.awakeableKey);
9898
case "sleep":
99-
// TODO yes this is an incorrect cast, but for now this is fine, type coercion in TS FTW.
100-
// We need a mapper function in our promise type to make this working.
101-
// The kotlin code looks like this:
102-
// ctx.timer(this.timeoutMillis.milliseconds).map { "sleep" }
103-
return ctx.sleep(
104-
command.timeoutMillis
105-
) as unknown as CombineablePromise<string>;
99+
return ctx.sleep(command.timeoutMillis).map(() => "sleep");
106100
case "runThrowTerminalException":
107101
return ctx.run<string>(() => {
108102
throw new TerminalError(command.reason);

packages/restate-sdk/src/context.ts

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import type {
2525
Serde,
2626
} from "@restatedev/restate-sdk-core";
2727
import { ContextImpl } from "./context_impl.js";
28+
import type { TerminalError } from "./types/errors.js";
2829

2930
/**
3031
* Represents the original request as sent to this handler.
@@ -651,6 +652,24 @@ export type CombineablePromise<T> = Promise<T> & {
651652
* This is a lower-bound.
652653
*/
653654
orTimeout(millis: number): CombineablePromise<T>;
655+
656+
/**
657+
* Creates a new {@link CombineablePromise} that maps the result of this promise with
658+
* the provided `mapper`, once this promise is fulfilled.
659+
*
660+
* **NOTE**: You **MUST** use this API when you need to map the result of a
661+
* {@link CombineablePromise} without `await`ing it, rather than using {@link Promise.then}.
662+
* {@link Promise.then} is used by Restate to distinguish when awaiting an asynchronous operation,
663+
* thus calling `.then` on several Restate promises can lead to concurrency issues.
664+
*
665+
* @param mapper the function to execute when this promise is fulfilled.
666+
* If the promise completed successfully, `value` is provided as input, otherwise `failure` is provided as input.
667+
* If this mapper returns a value, this value will be used to resolve the returned {@link CombineablePromise}.
668+
* If the mapper throws a {@link TerminalError}, this error will be used to reject the returned {@link CombineablePromise}.
669+
*/
670+
map<U>(
671+
mapper: (value?: T, failure?: TerminalError) => U
672+
): CombineablePromise<U>;
654673
};
655674

656675
/**

packages/restate-sdk/src/promises.ts

Lines changed: 68 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,12 @@ import type {
1717
InvocationPromise,
1818
} from "./context.js";
1919
import type * as vm from "./endpoint/handlers/vm/sdk_shared_core_wasm_bindings.js";
20-
import { CancelledError, TimeoutError } from "./types/errors.js";
20+
import {
21+
CancelledError,
22+
RestateError,
23+
TerminalError,
24+
TimeoutError,
25+
} from "./types/errors.js";
2126
import { CompletablePromise } from "./utils/completable_promise.js";
2227
import type { ContextImpl, RunClosuresTracker } from "./context_impl.js";
2328
import { setTimeout } from "timers/promises";
@@ -136,6 +141,12 @@ abstract class AbstractRestatePromise<T> implements RestatePromise<T> {
136141
) as CombineablePromise<T>;
137142
}
138143

144+
map<U>(
145+
mapper: (value?: T, failure?: TerminalError) => U
146+
): CombineablePromise<U> {
147+
return new RestateMappedPromise(this[RESTATE_CTX_SYMBOL], this, mapper);
148+
}
149+
139150
tryCancel() {
140151
this.cancelPromise.reject(new CancelledError());
141152
}
@@ -288,6 +299,10 @@ export class RestatePendingPromise<T> implements RestatePromise<T> {
288299
return this;
289300
}
290301

302+
map<U>(): CombineablePromise<U> {
303+
return this as unknown as CombineablePromise<U>;
304+
}
305+
291306
tryCancel(): void {}
292307
tryComplete(): void {}
293308
uncompletedLeaves(): number[] {
@@ -313,6 +328,58 @@ export class InvocationPendingPromise<T>
313328
}
314329
}
315330

331+
export class RestateMappedPromise<T, U> extends AbstractRestatePromise<U> {
332+
private publicPromiseMapper: (
333+
value?: T,
334+
failure?: TerminalError
335+
) => Promise<U>;
336+
337+
constructor(
338+
ctx: ContextImpl,
339+
readonly inner: RestatePromise<T>,
340+
mapper: (value?: T, failure?: TerminalError) => U
341+
) {
342+
super(ctx);
343+
this.publicPromiseMapper = (value?: T, failure?: TerminalError) => {
344+
try {
345+
return Promise.resolve(mapper(value, failure));
346+
} catch (e) {
347+
if (e instanceof TerminalError) {
348+
return Promise.reject(e);
349+
} else {
350+
ctx.handleInvocationEndError(e);
351+
return pendingPromise();
352+
}
353+
}
354+
};
355+
}
356+
357+
tryComplete(): void {
358+
this.inner.tryComplete();
359+
}
360+
361+
uncompletedLeaves(): number[] {
362+
return this.inner.uncompletedLeaves();
363+
}
364+
365+
publicPromise(): Promise<U> {
366+
const promiseMapper = this.publicPromiseMapper;
367+
return this.inner.publicPromise().then(
368+
(t) => promiseMapper(t, undefined),
369+
(error) => {
370+
if (error instanceof RestateError) {
371+
return promiseMapper(undefined, error);
372+
} else {
373+
// Something else, just re-throw it
374+
throw error;
375+
}
376+
}
377+
);
378+
}
379+
380+
readonly [Symbol.toStringTag] = "RestateMappedPromise";
381+
}
382+
316383
/**
317384
* Promises executor, gluing VM with I/O and Promises given to user space.
318385
*/

0 commit comments

Comments
 (0)