Skip to content

Commit 016dc59

Browse files
authored
Use the host AbortSignal if provided (#509)
For runtimes that provide a fetch compatible server api, we should thread trough the abort signal that is provided by them. For node and lambda we will provide our own.
1 parent bde4ffc commit 016dc59

File tree

4 files changed

+38
-13
lines changed

4 files changed

+38
-13
lines changed

packages/restate-sdk/src/endpoint/handlers/fetch.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ export function fetcher(handler: GenericHandler) {
2525
headers,
2626
body: event.body,
2727
extraArgs,
28+
abortSignal: event.signal,
2829
};
2930

3031
const resp = await handler.handle(request);

packages/restate-sdk/src/endpoint/handlers/generic.ts

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ export interface RestateRequest {
5555
readonly headers: Headers;
5656
readonly body: ReadableStream<Uint8Array> | null;
5757
readonly extraArgs: unknown[];
58+
readonly abortSignal: AbortSignal;
5859
}
5960

6061
export interface RestateResponse {
@@ -178,11 +179,13 @@ export class GenericHandler implements RestateHandler {
178179
this.endpoint.rlog.error(msg);
179180
return this.toErrorResponse(400, msg);
180181
}
182+
181183
return this.handleInvoke(
182184
handler,
183185
request.body,
184186
request.headers,
185187
request.extraArgs,
188+
request.abortSignal,
186189
context ?? {}
187190
);
188191
}
@@ -220,6 +223,7 @@ export class GenericHandler implements RestateHandler {
220223
body: ReadableStream<Uint8Array>,
221224
headers: Headers,
222225
extraArgs: unknown[],
226+
abortSignal: AbortSignal,
223227
additionalContext: AdditionalContext
224228
): Promise<RestateResponse> {
225229
const loggerId = Math.floor(Math.random() * 4_294_967_295 /* u32::MAX */);
@@ -277,8 +281,6 @@ export class GenericHandler implements RestateHandler {
277281
// Get input
278282
const input = coreVm.sys_input();
279283

280-
const abortController = new AbortController();
281-
282284
const invocationRequest: Request = {
283285
id: input.invocation_id,
284286
headers: input.headers.reduce((headers, { key, value }) => {
@@ -296,7 +298,7 @@ export class GenericHandler implements RestateHandler {
296298
),
297299
body: input.input,
298300
extraArgs,
299-
attemptCompletedSignal: abortController.signal,
301+
attemptCompletedSignal: abortSignal,
300302
};
301303

302304
// Prepare logger
@@ -398,11 +400,6 @@ export class GenericHandler implements RestateHandler {
398400
inputReader.cancel().catch(() => {});
399401
})
400402
.finally(() => {
401-
try {
402-
abortController.abort();
403-
} catch (e) {
404-
// suppressed
405-
}
406403
invocationLoggers.delete(loggerId);
407404
})
408405
.catch(() => {});

packages/restate-sdk/src/endpoint/handlers/lambda.ts

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,11 @@ import type {
1717
Context,
1818
} from "aws-lambda";
1919
import { Buffer } from "node:buffer";
20-
import type { GenericHandler, RestateRequest } from "./generic.js";
20+
import type {
21+
GenericHandler,
22+
RestateRequest,
23+
RestateResponse,
24+
} from "./generic.js";
2125
import { WritableStream, type ReadableStream } from "node:stream/web";
2226
import { OnceStream } from "../../utils/streams.js";
2327
import { X_RESTATE_SERVER } from "../../user_agent.js";
@@ -48,16 +52,26 @@ export class LambdaHandler {
4852
body = OnceStream(new TextEncoder().encode(event.body));
4953
}
5054

55+
const abortController = new AbortController();
56+
5157
const request: RestateRequest = {
5258
body,
5359
headers: event.headers,
5460
url: path,
5561
extraArgs: [context],
62+
abortSignal: abortController.signal,
5663
};
5764

58-
const resp = await this.handler.handle(request, {
59-
AWSRequestId: context.awsRequestId,
60-
});
65+
let resp: RestateResponse;
66+
67+
try {
68+
resp = await this.handler.handle(request, {
69+
AWSRequestId: context.awsRequestId,
70+
});
71+
} catch (e) {
72+
abortController.abort();
73+
throw e;
74+
}
6175

6276
const chunks: Uint8Array[] = [];
6377

@@ -85,6 +99,8 @@ export class LambdaHandler {
8599
isBase64Encoded: false,
86100
body: JSON.stringify({ message: error.message }),
87101
};
102+
} finally {
103+
abortController.abort();
88104
}
89105

90106
return {

packages/restate-sdk/src/endpoint/node_endpoint.ts

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,15 +74,25 @@ export class NodeEndpoint implements RestateEndpoint {
7474

7575
return (request, response) => {
7676
(async () => {
77+
const abortController = new AbortController();
78+
request.once("aborted", () => {
79+
abortController.abort();
80+
});
81+
request.once("close", () => {
82+
abortController.abort();
83+
});
84+
request.once("error", () => {
85+
abortController.abort();
86+
});
7787
try {
7888
const url = request.url;
7989
const resp = await handler.handle({
8090
url,
8191
headers: request.headers,
8292
body: Readable.toWeb(request),
8393
extraArgs: [],
94+
abortSignal: abortController.signal,
8495
});
85-
8696
response.writeHead(resp.statusCode, resp.headers);
8797
const responseWeb = Writable.toWeb(
8898
response
@@ -95,6 +105,7 @@ export class NodeEndpoint implements RestateEndpoint {
95105
"Error while handling connection: " + (error.stack ?? error.message)
96106
);
97107
response.destroy(error);
108+
abortController.abort();
98109
}
99110
})().catch(() => {});
100111
};

0 commit comments

Comments
 (0)