Skip to content

Commit 72432da

Browse files
authored
Unify send and sendDelayed (#298)
This commit replaces the sendDelayed variants, with an optional argument SendOptions. See examples/example.ts
1 parent aa18e2f commit 72432da

File tree

4 files changed

+71
-107
lines changed

4 files changed

+71
-107
lines changed

examples/example.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@ const greeter = restate.service({
2222

2323
// sending messages to ourselves, immediately and delayed
2424
ctx.serviceSendClient(Greeter).logger(message);
25-
ctx.serviceSendDelayedClient(Greeter, 100).logger("delayed " + message);
25+
ctx
26+
.serviceSendClient(Greeter, { delay: 100 })
27+
.logger("delayed " + message);
2628

2729
return message;
2830
},

src/context.ts

Lines changed: 43 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,43 @@ export interface KeyValueStore {
9595
clearAll(): void;
9696
}
9797

98+
export interface SendOptions {
99+
/**
100+
* Makes a type-safe one-way RPC to the specified target service, after a delay specified by the
101+
* milliseconds' argument.
102+
* This method is like stetting up a fault-tolerant cron job that enqueues the message in a
103+
* message queue.
104+
* The handler calling this function does not have to stay active for the delay time.
105+
*
106+
* Both the delay timer and the message are durably stored in Restate and guaranteed to be reliably
107+
* delivered. The delivery happens no earlier than specified through the delay, but may happen
108+
* later, if the target service is down, or backpressuring the system.
109+
*
110+
* The delay message is journaled for durable execution and will thus not be duplicated when the
111+
* handler is re-invoked for retries or after suspending.
112+
*
113+
* This call will return immediately; the message sending happens asynchronously in the background.
114+
* Despite that, the message is guaranteed to be sent, because the completion of the invocation that
115+
* triggers the send (calls this function) happens logically after the sending. That means that any
116+
* failure where the message does not reach Restate also cannot complete this invocation, and will
117+
* hence recover this handler and (through the durable execution) recover the message to be sent.
118+
*
119+
* @example
120+
* *Service Side:*
121+
* ```ts
122+
* const service = restate.service({
123+
* ...
124+
* });
125+
*
126+
* ```
127+
* **Client side:**
128+
* ```ts
129+
* ctx.send(service, {delay: 60_000}).anotherAction(1337);
130+
* ```
131+
*/
132+
delay?: number;
133+
}
134+
98135
export interface ContextDate {
99136
/** Returns the number of milliseconds elapsed since midnight, January 1, 1970 Universal Coordinated Time (UTC).
100137
* This is equivalent to Date.now()
@@ -323,67 +360,14 @@ export interface Context {
323360
* ```
324361
*/
325362
objectSendClient<P extends string, M>(
326-
opts: VirtualObjectDefinition<P, M>,
327-
key: string
328-
): SendClient<M>;
329-
serviceSendClient<P extends string, M>(
330-
opts: ServiceDefinition<P, M>
363+
obj: VirtualObjectDefinition<P, M>,
364+
key: string,
365+
opts?: SendOptions
331366
): SendClient<M>;
332367

333-
/**
334-
* Makes a type-safe one-way RPC to the specified target service, after a delay specified by the
335-
* milliseconds' argument.
336-
* This method is like stetting up a fault-tolerant cron job that enqueues the message in a
337-
* message queue.
338-
* The handler calling this function does not have to stay active for the delay time.
339-
*
340-
* Both the delay timer and the message are durably stored in Restate and guaranteed to be reliably
341-
* delivered. The delivery happens no earlier than specified through the delay, but may happen
342-
* later, if the target service is down, or backpressuring the system.
343-
*
344-
* The delay message is journaled for durable execution and will thus not be duplicated when the
345-
* handler is re-invoked for retries or after suspending.
346-
*
347-
* This call will return immediately; the message sending happens asynchronously in the background.
348-
* Despite that, the message is guaranteed to be sent, because the completion of the invocation that
349-
* triggers the send (calls this function) happens logically after the sending. That means that any
350-
* failure where the message does not reach Restate also cannot complete this invocation, and will
351-
* hence recover this handler and (through the durable execution) recover the message to be sent.
352-
*
353-
* @example
354-
* *Service Side:*
355-
* ```ts
356-
* const router = restate.router({
357-
* someAction: async(ctx: restate.RpcContext, req: string) => { ... },
358-
* anotherAction: async(ctx: restate.RpcContext, count: number) => { ... }
359-
* });
360-
*
361-
* // option 1: export only the type signature of the router
362-
* export type myApiType = typeof router;
363-
*
364-
* // option 2: export the API definition with type and name (name)
365-
* export const myApi: restate.ServiceApi<typeof router> = { name : "myservice" };
366-
*
367-
* restate.createServer().bindRouter("myservice", router).listen(9080);
368-
* ```
369-
* **Client side:**
370-
* ```ts
371-
* // option 1: use only types and supply service name separately
372-
* ctx.sendDelayed<myApiType>({name: "myservice"}, 60_000).someAction("hello!");
373-
*
374-
* // option 2: use full API spec
375-
* ctx.sendDelayed(myApi, 60_000).anotherAction(1337);
376-
* ```
377-
*/
378-
objectSendDelayedClient<P extends string, M>(
379-
opts: VirtualObjectDefinition<P, M>,
380-
delay: number,
381-
key: string
382-
): SendClient<M>;
383-
384-
serviceSendDelayedClient<P extends string, M>(
385-
opts: ServiceDefinition<P, M>,
386-
delay: number
368+
serviceSendClient<P extends string, M>(
369+
service: ServiceDefinition<P, M>,
370+
opts?: SendOptions
387371
): SendClient<M>;
388372

389373
/**

src/context_impl.ts

Lines changed: 16 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import {
1515
ObjectContext,
1616
Rand,
1717
Request,
18+
SendOptions,
1819
} from "./context";
1920
import { StateMachine } from "./state_machine";
2021
import {
@@ -313,14 +314,8 @@ export class ContextImpl implements ObjectContext {
313314
}
314315

315316
public serviceSendClient<P extends string, M>(
316-
options: ServiceDefinition<P, M>
317-
): SendClient<M> {
318-
return this.serviceSendDelayedClient(options, 0);
319-
}
320-
321-
public serviceSendDelayedClient<P extends string, M>(
322-
{ name }: ServiceDefinition<P, M>,
323-
delayMillis: number
317+
service: ServiceDefinition<P, M>,
318+
opts?: SendOptions
324319
): SendClient<M> {
325320
const clientProxy = new Proxy(
326321
{},
@@ -329,11 +324,14 @@ export class ContextImpl implements ObjectContext {
329324
const route = prop as string;
330325
return (...args: unknown[]) => {
331326
const requestBytes = serializeJson(args.shift());
332-
this.invokeOneWay(name, route, requestBytes, delayMillis).catch(
333-
(e) => {
334-
this.stateMachine.handleDanglingPromiseError(e);
335-
}
336-
);
327+
this.invokeOneWay(
328+
service.name,
329+
route,
330+
requestBytes,
331+
opts?.delay
332+
).catch((e) => {
333+
this.stateMachine.handleDanglingPromiseError(e);
334+
});
337335
};
338336
},
339337
}
@@ -343,16 +341,9 @@ export class ContextImpl implements ObjectContext {
343341
}
344342

345343
public objectSendClient<P extends string, M>(
346-
options: ServiceDefinition<P, M>,
347-
key: string
348-
): SendClient<M> {
349-
return this.objectSendDelayedClient(options, 0, key);
350-
}
351-
352-
public objectSendDelayedClient<P extends string, M>(
353-
{ name }: ServiceDefinition<P, M>,
354-
delayMillis: number,
355-
key: string
344+
obj: ServiceDefinition<P, M>,
345+
key: string,
346+
opts?: SendOptions
356347
): SendClient<M> {
357348
const clientProxy = new Proxy(
358349
{},
@@ -362,10 +353,10 @@ export class ContextImpl implements ObjectContext {
362353
return (...args: unknown[]) => {
363354
const requestBytes = serializeJson(args.shift());
364355
this.invokeOneWay(
365-
name,
356+
obj.name,
366357
route,
367358
requestBytes,
368-
delayMillis,
359+
opts?.delay,
369360
key
370361
).catch((e) => {
371362
this.stateMachine.handleDanglingPromiseError(e);

src/workflows/workflow_wrapper_service.ts

Lines changed: 9 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
* https://github.com/restatedev/sdk-typescript/blob/main/LICENSE
1010
*/
1111

12-
import { ContextDate, Request } from "../context";
12+
import { ContextDate, Request, SendOptions } from "../context";
1313
import * as restate from "../public_api";
1414
import * as wf from "./workflow";
1515
import * as wss from "./workflow_state_service";
@@ -176,27 +176,16 @@ class ExclusiveContextImpl<P extends string>
176176
}
177177
objectSendClient<P extends string, M>(
178178
opts: restate.ServiceDefinition<P, M>,
179-
key: string
179+
key: string,
180+
sendOpts?: SendOptions
180181
): restate.SendClient<M> {
181-
return this.ctx.objectSendClient(opts, key);
182+
return this.ctx.objectSendClient(opts, key, sendOpts);
182183
}
183184
serviceSendClient<P extends string, M>(
184-
opts: restate.ServiceDefinition<P, M>
185-
): restate.SendClient<M> {
186-
return this.ctx.serviceSendClient(opts);
187-
}
188-
objectSendDelayedClient<P extends string, M>(
189-
opts: restate.ServiceDefinition<P, M>,
190-
delay: number,
191-
key: string
192-
): restate.SendClient<M> {
193-
return this.ctx.objectSendDelayedClient(opts, delay, key);
194-
}
195-
serviceSendDelayedClient<P extends string, M>(
196185
opts: restate.ServiceDefinition<P, M>,
197-
delay: number
186+
sendOpts?: SendOptions
198187
): restate.SendClient<M> {
199-
return this.ctx.serviceSendDelayedClient(opts, delay);
188+
return this.ctx.serviceSendClient(opts, sendOpts);
200189
}
201190
}
202191

@@ -251,11 +240,9 @@ export function createWrapperService<P extends string, R, T, M>(
251240
throw err;
252241
} finally {
253242
ctx
254-
.objectSendDelayedClient(
255-
stateServiceApi,
256-
DEFAULT_RETENTION_PERIOD,
257-
request.workflowId
258-
)
243+
.objectSendClient(stateServiceApi, request.workflowId, {
244+
delay: DEFAULT_RETENTION_PERIOD,
245+
})
259246
.dispose();
260247
}
261248
},

0 commit comments

Comments
 (0)