Skip to content

Commit 022894b

Browse files
authored
Improve error handling (#552)
--------- Signed-off-by: Nik Nasr <[email protected]>
1 parent 9453c8e commit 022894b

File tree

3 files changed

+56
-2
lines changed

3 files changed

+56
-2
lines changed

packages/restate-e2e-services/src/app.ts

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,8 @@ if (process.env.E2E_REQUEST_SIGNING) {
6262
}
6363

6464
let INFLIGHT_REQUESTS = 0;
65+
let ACTIVE_SESSIONS = 0;
66+
const sessions = new Map();
6567

6668
const handler = endpoint.http2Handler();
6769
const server = http2.createServer((req, res) => {
@@ -72,11 +74,43 @@ const server = http2.createServer((req, res) => {
7274
handler(req, res);
7375
});
7476

77+
server.on("session", (session) => {
78+
const sessionId = ACTIVE_SESSIONS++;
79+
const streams = new Set();
80+
sessions.set(sessionId, streams);
81+
82+
const handleCloseSession = () => {
83+
sessions.delete(sessionId);
84+
};
85+
86+
session.on("close", handleCloseSession);
87+
session.on("error", handleCloseSession);
88+
89+
session.on("stream", (stream) => {
90+
streams.add(`${sessionId}_${stream.id}`);
91+
92+
const handleCloseStream = () => {
93+
streams.delete(`${sessionId}_${stream.id}`);
94+
};
95+
96+
stream.on("close", handleCloseStream);
97+
stream.on("error", handleCloseStream);
98+
});
99+
100+
return undefined;
101+
});
102+
75103
setInterval(() => {
76104
// eslint-disable-next-line no-console
77105
console.log(
78106
`${new Date().toISOString()}: Inflight requests: ${INFLIGHT_REQUESTS}`
79107
);
108+
// eslint-disable-next-line no-console
109+
console.table(
110+
Array.from(sessions.values()).map((set: Set<string>) => ({
111+
"#streams": set.size,
112+
}))
113+
);
80114
}, 30 * 1000);
81115

82116
server.updateSettings(settings);

packages/restate-sdk/src/endpoint/handlers/generic.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -271,6 +271,13 @@ export class GenericHandler implements RestateHandler {
271271
);
272272

273273
const inputReader = body.getReader();
274+
abortSignal.addEventListener(
275+
"abort",
276+
() => {
277+
void inputReader.cancel();
278+
},
279+
{ once: true }
280+
);
274281

275282
// Now buffer input entries
276283
while (!coreVm.is_ready_to_execute()) {

packages/restate-sdk/src/endpoint/node_endpoint.ts

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ export class NodeEndpoint implements RestateEndpoint {
7474
return (request, response) => {
7575
(async () => {
7676
const abortController = new AbortController();
77+
7778
request.once("aborted", () => {
7879
abortController.abort();
7980
});
@@ -83,21 +84,33 @@ export class NodeEndpoint implements RestateEndpoint {
8384
request.once("error", () => {
8485
abortController.abort();
8586
});
87+
88+
if (request.destroyed || request.aborted) {
89+
this.builder.rlog.error("Client disconnected");
90+
abortController.abort();
91+
}
92+
8693
try {
8794
const url = request.url;
95+
const webRequestBody = Readable.toWeb(request);
96+
8897
const resp = await handler.handle({
8998
url,
9099
headers: request.headers,
91-
body: Readable.toWeb(request),
100+
body: webRequestBody,
92101
extraArgs: [],
93102
abortSignal: abortController.signal,
94103
});
104+
105+
if (response.destroyed) {
106+
return;
107+
}
108+
95109
response.writeHead(resp.statusCode, resp.headers);
96110
const responseWeb = Writable.toWeb(
97111
response
98112
) as WritableStream<Uint8Array>;
99113
await resp.body.pipeTo(responseWeb);
100-
await new Promise<void>((resolve) => response.end(resolve));
101114
} catch (e) {
102115
const error = ensureError(e);
103116
this.builder.rlog.error(

0 commit comments

Comments
 (0)