Skip to content

Commit 011c802

Browse files
Refactor APIs to be a bit more explicit. (#174)
Refactor example.
1 parent a477be4 commit 011c802

File tree

4 files changed

+75
-43
lines changed

4 files changed

+75
-43
lines changed

examples/embedded_example.ts

Lines changed: 11 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -14,27 +14,22 @@
1414
import express, { Request, Response } from "express";
1515
import * as restate from "../src/public_api";
1616

17-
const rs = restate.connection({ ingress: "http://127.0.0.1:9090" });
17+
const rs = restate.connection("http://127.0.0.1:8080");
1818

1919
const app = express();
2020
app.use(express.json());
2121

2222
app.post("/workflow", async (req: Request, res: Response) => {
23-
const { id, name } = req.body;
24-
25-
const response = await rs.invoke({
26-
id,
27-
input: name,
28-
handler: async (ctx, name) => {
29-
const p1 = ctx.sideEffect(async () => `Hello ${name}!`);
30-
const p2 = ctx.sideEffect(async () => `Bonjour ${name}`);
31-
const p3 = ctx.sideEffect(async () => `Hi ${name}`);
32-
//const p4 = ctx
33-
// .rpc<{ greet: (name: string) => Promise<string> }>({ path: "greeter" })
34-
// .greet(name);
35-
36-
return (await p1) + (await p2) + (await p3);
37-
},
23+
const { id } = req.body;
24+
25+
const response = await rs.invoke(id, {}, async (ctx) => {
26+
// You can use all RestateContext features here (except sleep)
27+
28+
const response = await ctx.sideEffect(async () => {
29+
return (await fetch("https://dummyjson.com/products/1")).json();
30+
});
31+
32+
return response.title;
3833
});
3934

4035
res.send(response);

src/embedded/api.ts

Lines changed: 27 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -15,31 +15,43 @@ import { wrapHandler } from "./handler";
1515
import crypto from "crypto";
1616
import { RemoteContext } from "../generated/proto/services";
1717
import { bufConnectRemoteContext } from "./http2_remote";
18+
import { OutgoingHttpHeaders } from "http";
1819

1920
export type RestateConnectionOptions = {
20-
ingress: string;
21+
/**
22+
* Additional headers attached to the requests sent to Restate.
23+
*/
24+
headers: OutgoingHttpHeaders;
2125
};
2226

23-
export type RestateInvocationOptions<I, O> = {
24-
id: string;
25-
handler: (ctx: RpcContext, input: I) => Promise<O>;
26-
input: I;
27+
export type RestateInvocationOptions = {
28+
/**
29+
* Retention period for the response in seconds.
30+
* After the invocation completes, the response will be persisted for the given duration.
31+
* Afterward, the system will clean up the response and treats any subsequent invocation with same operation_id as new.
32+
*
33+
* If not set, 30 minutes will be used as retention period.
34+
*/
2735
retain?: number;
2836
};
2937

30-
export const connection = (opts: RestateConnectionOptions): RestateConnection =>
31-
new RestateConnection(opts);
38+
export const connection = (
39+
address: string,
40+
opt?: RestateConnectionOptions
41+
): RestateConnection =>
42+
new RestateConnection(bufConnectRemoteContext(address, opt));
3243

3344
export class RestateConnection {
34-
private remote: RemoteContext;
45+
constructor(private readonly remote: RemoteContext) {}
3546

36-
constructor(readonly opts: RestateConnectionOptions) {
37-
this.remote = bufConnectRemoteContext(opts.ingress);
38-
}
39-
40-
public invoke<I, O>(opt: RestateInvocationOptions<I, O>): Promise<O> {
41-
const method = wrapHandler(opt.handler);
47+
public invoke<I, O>(
48+
id: string,
49+
input: I,
50+
handler: (ctx: RpcContext, input: I) => Promise<O>,
51+
opt?: RestateInvocationOptions
52+
): Promise<O> {
53+
const method = wrapHandler(handler);
4254
const streamId = crypto.randomUUID();
43-
return doInvoke<I, O>(this.remote, opt.id, streamId, method, opt.input);
55+
return doInvoke<I, O>(this.remote, id, streamId, input, method, opt);
4456
}
4557
}

src/embedded/http2_remote.ts

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ import {
1515
RemoteContext,
1616
RemoteContextClientImpl,
1717
} from "../generated/proto/services";
18+
import { RestateConnectionOptions } from "./api";
19+
import { OutgoingHttpHeaders } from "http";
1820

1921
export class RequestError extends Error {
2022
constructor(
@@ -30,8 +32,11 @@ export class RequestError extends Error {
3032
}
3133
}
3234

33-
export const bufConnectRemoteContext = (url: string): RemoteContext => {
34-
const httpClient = new ProtobufHttp2Client(url);
35+
export const bufConnectRemoteContext = (
36+
url: string,
37+
opt?: RestateConnectionOptions
38+
): RemoteContext => {
39+
const httpClient = new ProtobufHttp2Client(url, opt);
3540

3641
return new RemoteContextClientImpl({
3742
request: (service: string, method: string, data: Uint8Array) =>
@@ -41,8 +46,14 @@ export const bufConnectRemoteContext = (url: string): RemoteContext => {
4146

4247
class ProtobufHttp2Client {
4348
private session?: http2.ClientHttp2Session;
49+
private additionalHeaders: OutgoingHttpHeaders;
4450

45-
public constructor(private readonly ingress: string) {}
51+
public constructor(
52+
private readonly ingress: string,
53+
opt?: RestateConnectionOptions
54+
) {
55+
this.additionalHeaders = opt?.headers == undefined ? {} : opt?.headers;
56+
}
4657

4758
private async client(): Promise<http2.ClientHttp2Session> {
4859
if (this.session !== undefined) {
@@ -65,11 +76,16 @@ class ProtobufHttp2Client {
6576
const client = await this.client();
6677

6778
const req = client.request({
68-
[http2.constants.HTTP2_HEADER_SCHEME]: "http",
69-
[http2.constants.HTTP2_HEADER_METHOD]: http2.constants.HTTP2_METHOD_POST,
70-
[http2.constants.HTTP2_HEADER_PATH]: path,
71-
[http2.constants.HTTP2_HEADER_CONTENT_TYPE]: "application/proto",
72-
[http2.constants.HTTP2_HEADER_CONTENT_LENGTH]: body.length,
79+
...{
80+
[http2.constants.HTTP2_HEADER_SCHEME]: "http",
81+
[http2.constants.HTTP2_HEADER_METHOD]:
82+
http2.constants.HTTP2_METHOD_POST,
83+
[http2.constants.HTTP2_HEADER_PATH]: path,
84+
[http2.constants.HTTP2_HEADER_CONTENT_TYPE]: "application/proto",
85+
[http2.constants.HTTP2_HEADER_ACCEPT]: "application/proto",
86+
[http2.constants.HTTP2_HEADER_CONTENT_LENGTH]: body.length,
87+
},
88+
...this.additionalHeaders,
7389
});
7490
req.end(body);
7591

src/embedded/invocation.ts

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,11 @@
1111

1212
import { decodeMessagesBuffer } from "../io/decoder";
1313
import { Message } from "../types/types";
14-
import { GetResultResponse, RemoteContext } from "../generated/proto/services";
14+
import {
15+
GetResultResponse,
16+
RemoteContext,
17+
StartRequest,
18+
} from "../generated/proto/services";
1519
import { InvocationBuilder } from "../invocation";
1620
import { HostedGrpcServiceMethod } from "../types/grpc";
1721
import { StateMachine } from "../state_machine";
@@ -20,24 +24,29 @@ import {
2024
EmbeddedConnection,
2125
FencedOffError,
2226
} from "../connection/embedded_connection";
27+
import { RestateInvocationOptions } from "./api";
2328

2429
export const doInvoke = async <I, O>(
2530
remote: RemoteContext,
2631
operationId: string,
2732
streamId: string,
33+
input: I,
2834
method: HostedGrpcServiceMethod<I, O>,
29-
input: I
35+
opt?: RestateInvocationOptions
3036
): Promise<O> => {
3137
//
3238
// 1. ask to Start this execution.
3339
//
3440

35-
const res = await remote.start({
41+
const startRequest = StartRequest.fromPartial({
3642
operationId,
3743
streamId,
38-
retentionPeriodSec: 60,
3944
argument: Buffer.from(JSON.stringify(input)),
4045
});
46+
if (opt != undefined && opt.retain != undefined) {
47+
startRequest.retentionPeriodSec = opt.retain;
48+
}
49+
const res = await remote.start(startRequest);
4150

4251
if (res.completed !== undefined) {
4352
return unwrap(res.completed);

0 commit comments

Comments
 (0)