Skip to content

Commit b67fcac

Browse files
StephanEwenigalshilman
authored andcommitted
Restructure external clients and add comments
1 parent 877e66d commit b67fcac

File tree

3 files changed

+174
-89
lines changed

3 files changed

+174
-89
lines changed

examples/workflow_example.ts

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import * as restate from "../src/public_api";
2-
import * as restate_clients from "../src/clients/workflow_client";
32
import { randomUUID } from "crypto";
43

54
/* eslint-disable no-console */
@@ -67,24 +66,34 @@ const workflowApi = myworkflow.api;
6766
restate.createServer().bind(myworkflow).listen(9080);
6867

6968
//
70-
// (2) Code to nteract with the workflow using an external client
69+
// (2) Code to interact with the workflow using an external client
7170
//
7271
// This submits a workflow and sends signals / queries to the workflow.
7372
//
74-
7573
async function startWorkflowAndInteract(restateUrl: string) {
76-
const restate = restate_clients.connectRestate(restateUrl);
74+
const restateServer = restate.clients.connect(restateUrl);
7775

7876
const args = { name: "Restatearius" };
7977
const workflowId = randomUUID();
8078

8179
// Option a) we can create clients either with just the workflow service path
82-
await restate.submitWorkflow("acme.myworkflow", workflowId, args);
80+
const submit1 = await restateServer.submitWorkflow(
81+
"acme.myworkflow",
82+
workflowId,
83+
args
84+
);
85+
console.log("Submitted workflow with result: " + submit1.status);
8386

8487
// Option b) we can supply the API signature and get a typed interface for all the methods
8588
// Because the submit is idempotent, this call here will effectively attach to the
8689
// previous workflow
87-
const client = await restate.submitWorkflow(workflowApi, workflowId, args);
90+
const submit2 = await restateServer.submitWorkflow(
91+
workflowApi,
92+
workflowId,
93+
args
94+
);
95+
console.log("Submitted workflow with result: " + submit2.status);
96+
const client = submit2.client;
8897

8998
// check the status (should be RUNNING)
9099
const status = await client.status();

src/clients/workflow_client.ts

Lines changed: 158 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -10,60 +10,173 @@
1010
*/
1111

1212
import * as restate from "../public_api";
13-
import {
14-
LifecycleStatus,
15-
WorkflowConnectedSignature,
16-
WorkflowExternalSignature,
17-
WorkflowRequest,
18-
} from "../workflows/workflow";
13+
import { ensureError } from "../types/errors";
1914

20-
/* eslint-disable no-console */
15+
/**
16+
* A client to interact with running workflows.
17+
*/
18+
export interface WorkflowClient<R, U> {
19+
/**
20+
* Gets the ID of the workflow that this client talks to.
21+
*/
22+
workflowId(): string;
23+
24+
/**
25+
* Gets the status of the workflow, as a {@link restate.workflow.LifecycleStatus}.
26+
* This will take on the values "NOT_STARTED", "RUNNING", "FINISHED", "FAILED".
27+
*/
28+
status(): Promise<restate.workflow.LifecycleStatus>;
29+
30+
/**
31+
* Returns a promise completed with the result. This will resolve successfully on successful
32+
* termination of the workflow, and will be rejected if the workflow throws an Error.
33+
*/
34+
result(): Promise<R>;
35+
36+
/**
37+
* Gets the interface to the workflow through which all the workflow's additional methods
38+
* can be called.
39+
*
40+
* To get the proper typed client, use the {@link WorkflowConnection.submitWorkflow} or
41+
* {@link WorkflowConnection.connectToWorkflow} functions that accpet a typed ServiceApi
42+
* object, as in the example below.
43+
*
44+
* @example
45+
* In the workflow definition:
46+
* ```
47+
* const myWorkflow = restate.workflow.workflow("acme.myworkflow", { ... });
48+
* export const myWorkflowApi = myworkflow.api;
49+
* ```
50+
* In the client code:
51+
* ```
52+
* import { myWorkflowApi } from "../server/myWorkflow"
53+
* ...
54+
* const restate = connectWorkflows("https://restatehost:8080");
55+
* restate.submitWorkflow(myWorkflowApi, workflowId, args);
56+
* restate.connectToWorkflow(myWorkflowApi, workflowId);
57+
* ```
58+
*/
59+
workflowInterface(): restate.Client<
60+
restate.workflow.WorkflowConnectedSignature<U>
61+
>;
62+
}
2163

22-
export interface Restate {
64+
/**
65+
* A connection to Restate that let's you submit workflows or connect to workflows.
66+
* This is a typed client that internally makes HTTP calls to Restate to launch trigger
67+
* an execution of a workflow service, or to connect to an existing execution.
68+
*/
69+
export interface RestateClient {
2370
submitWorkflow<R, T>(
2471
path: string,
2572
workflowId: string,
2673
params: T
27-
): Promise<WorkflowClient<R, unknown>>;
74+
): Promise<{
75+
status: restate.workflow.WorkflowStartResult;
76+
client: WorkflowClient<R, unknown>;
77+
}>;
2878

2979
submitWorkflow<R, T, U>(
30-
workflowApi: restate.ServiceApi<WorkflowExternalSignature<R, T, U>>,
80+
workflowApi: restate.ServiceApi<
81+
restate.workflow.WorkflowExternalSignature<R, T, U>
82+
>,
3183
workflowId: string,
3284
params: T
33-
): Promise<WorkflowClient<R, U>>;
85+
): Promise<{
86+
status: restate.workflow.WorkflowStartResult;
87+
client: WorkflowClient<R, U>;
88+
}>;
3489

3590
connectToWorkflow<R = unknown>(
3691
path: string,
3792
workflowId: string
38-
): Promise<WorkflowClient<R, unknown>>;
93+
): Promise<{
94+
status: restate.workflow.LifecycleStatus;
95+
client: WorkflowClient<R, unknown>;
96+
}>;
3997

4098
connectToWorkflow<R, T, U>(
41-
workflowApi: restate.ServiceApi<WorkflowExternalSignature<R, T, U>>,
99+
workflowApi: restate.ServiceApi<
100+
restate.workflow.WorkflowExternalSignature<R, T, U>
101+
>,
42102
workflowId: string
43-
): Promise<WorkflowClient<R, U>>;
103+
): Promise<{
104+
status: restate.workflow.LifecycleStatus;
105+
client: WorkflowClient<R, U>;
106+
}>;
44107
}
45108

46-
export interface WorkflowClient<R, U> {
47-
workflowId(): string;
48-
status(): Promise<LifecycleStatus>; // RUNNING / FINISHED / FAILED
49-
50-
result(): Promise<R>;
51-
52-
workflowInterface(): restate.Client<WorkflowConnectedSignature<U>>; // call methods on workflow
53-
54-
// latestMessage(): Promise<StatusMessage>;
109+
/**
110+
* Creates a typed client to start and interact with workflow executions.
111+
* The specifiec URI must point to the Restate request endpoint (ingress).
112+
*
113+
* This function doesn't immediately verify the connection, it will not fail
114+
* if Restate is unreachable. Connection failures will only manifest when
115+
* attempting to submit or connect a specific workflow.
116+
*/
117+
export function connect(restateUri: string): RestateClient {
118+
return {
119+
submitWorkflow: async <R, T, U>(
120+
pathOrApi:
121+
| string
122+
| restate.ServiceApi<
123+
restate.workflow.WorkflowExternalSignature<R, T, U>
124+
>,
125+
workflowId: string,
126+
params: T
127+
): Promise<{
128+
status: restate.workflow.WorkflowStartResult;
129+
client: WorkflowClient<R, U>;
130+
}> => {
131+
const path = typeof pathOrApi === "string" ? pathOrApi : pathOrApi.path;
132+
133+
let result: restate.workflow.WorkflowStartResult;
134+
try {
135+
result = await makeCall(restateUri, path, "start", workflowId, params);
136+
} catch (err) {
137+
const error = ensureError(err);
138+
throw new Error("Cannot start workflow: " + error.message, {
139+
cause: error,
140+
});
141+
}
55142

56-
// getMessages(
57-
// fromSeqNum: number
58-
// ): AsyncGenerator<StatusMessage, void, undefined>;
59-
}
143+
return {
144+
status: result,
145+
client: new WorkflowClientImpl(restateUri, path, workflowId),
146+
};
147+
},
60148

61-
export function connectRestate(uri: string) {
62-
return new RestateImpl(uri);
149+
async connectToWorkflow<R, T, U>(
150+
pathOrApi:
151+
| string
152+
| restate.ServiceApi<
153+
restate.workflow.WorkflowExternalSignature<R, T, U>
154+
>,
155+
workflowId: string
156+
): Promise<{
157+
status: restate.workflow.LifecycleStatus;
158+
client: WorkflowClient<R, U>;
159+
}> {
160+
const path = typeof pathOrApi === "string" ? pathOrApi : pathOrApi.path;
161+
const client: WorkflowClient<R, U> = new WorkflowClientImpl(
162+
restateUri,
163+
path,
164+
workflowId
165+
);
166+
const status = await client.status();
167+
if (status === restate.workflow.LifecycleStatus.NOT_STARTED) {
168+
throw new Error(
169+
"No workflow running/finished/failed with ID " + workflowId
170+
);
171+
}
172+
return {
173+
status,
174+
client: new WorkflowClientImpl(restateUri, path, workflowId),
175+
};
176+
},
177+
} satisfies RestateClient;
63178
}
64179

65-
// ------------------------------ implementation ------------------------------
66-
67180
class WorkflowClientImpl<R, U> implements WorkflowClient<R, U> {
68181
constructor(
69182
private readonly restateUri: string,
@@ -75,15 +188,17 @@ class WorkflowClientImpl<R, U> implements WorkflowClient<R, U> {
75188
return this.wfId;
76189
}
77190

78-
status(): Promise<LifecycleStatus> {
191+
status(): Promise<restate.workflow.LifecycleStatus> {
79192
return this.makeCall("status", {});
80193
}
81194

82195
result(): Promise<R> {
83196
return this.makeCall("waitForResult", {});
84197
}
85198

86-
workflowInterface(): restate.Client<WorkflowConnectedSignature<U>> {
199+
workflowInterface(): restate.Client<
200+
restate.workflow.WorkflowConnectedSignature<U>
201+
> {
87202
const clientProxy = new Proxy(
88203
{},
89204
{
@@ -96,25 +211,11 @@ class WorkflowClientImpl<R, U> implements WorkflowClient<R, U> {
96211
}
97212
);
98213

99-
return clientProxy as restate.Client<WorkflowConnectedSignature<U>>;
214+
return clientProxy as restate.Client<
215+
restate.workflow.WorkflowConnectedSignature<U>
216+
>;
100217
}
101218

102-
// latestMessage(): Promise<StatusMessage> {
103-
// return this.makeCall("getLatestMessage", {});
104-
// }
105-
106-
// async *getMessages(fromSeqNum: number) {
107-
// while (true) {
108-
// const msgs: StatusMessage[] = await this.makeCall("pollNextMessages", {
109-
// from: fromSeqNum,
110-
// });
111-
// for (const msg of msgs) {
112-
// yield msg;
113-
// }
114-
// fromSeqNum += msgs.length;
115-
// }
116-
// }
117-
118219
private async makeCall<RR, TT>(method: string, args: TT): Promise<RR> {
119220
return await makeCall(
120221
this.restateUri,
@@ -126,36 +227,6 @@ class WorkflowClientImpl<R, U> implements WorkflowClient<R, U> {
126227
}
127228
}
128229

129-
class RestateImpl implements Restate {
130-
constructor(private readonly restateUri: string) {}
131-
132-
async submitWorkflow<R, T, U>(
133-
pathOrApi: string | restate.ServiceApi<WorkflowExternalSignature<R, T, U>>,
134-
workflowId: string,
135-
params: T
136-
): Promise<WorkflowClient<R, U>> {
137-
const path = typeof pathOrApi === "string" ? pathOrApi : pathOrApi.path;
138-
const response = await makeCall(
139-
this.restateUri,
140-
path,
141-
"start",
142-
workflowId,
143-
params
144-
);
145-
console.log("Start() call completed: Workflow is " + response);
146-
147-
return new WorkflowClientImpl(this.restateUri, path, workflowId);
148-
}
149-
150-
async connectToWorkflow<R, T, U>(
151-
pathOrApi: string | restate.ServiceApi<WorkflowExternalSignature<R, T, U>>,
152-
workflowId: string
153-
): Promise<WorkflowClient<R, U>> {
154-
const path = typeof pathOrApi === "string" ? pathOrApi : pathOrApi.path;
155-
return new WorkflowClientImpl(this.restateUri, path, workflowId);
156-
}
157-
}
158-
159230
// ----------------------------------------------------------------------------
160231
// Utils
161232
// ----------------------------------------------------------------------------
@@ -167,7 +238,7 @@ async function makeCall<R, T>(
167238
workflowId: string,
168239
params: T
169240
): Promise<R> {
170-
if (typeof workflowId !== "string" || workflowId.length === 0) {
241+
if (!workflowId || typeof workflowId !== "string") {
171242
throw new Error("missing workflowId");
172243
}
173244
if (params === undefined) {
@@ -179,7 +250,10 @@ async function makeCall<R, T>(
179250

180251
const url = `${restateUri}/${serviceName}/${method}`;
181252
const data = {
182-
request: { workflowId, ...params } satisfies WorkflowRequest<T>,
253+
request: {
254+
workflowId,
255+
...params,
256+
} satisfies restate.workflow.WorkflowRequest<T>,
183257
};
184258

185259
let body: string;
@@ -189,7 +263,8 @@ async function makeCall<R, T>(
189263
throw new Error("Cannot encode request: " + err, { cause: err });
190264
}
191265

192-
console.log(`Making call to Restate workflow at ${url} with ${body}`);
266+
// eslint-disable-next-line no-console
267+
console.debug(`Making call to Restate at ${url}`);
193268

194269
const httpResponse = await fetch(url, {
195270
method: "POST",

src/public_api.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,3 +48,4 @@ export {
4848
RestateConnectionOptions,
4949
} from "./embedded/api";
5050
export * as workflow from "./workflows/workflow";
51+
export * as clients from "./clients/workflow_client";

0 commit comments

Comments
 (0)