Skip to content

Unify send and send-delayed #298

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion examples/example.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ const greeter = restate.service({

// sending messages to ourselves, immediately and delayed
ctx.serviceSendClient(Greeter).logger(message);
ctx.serviceSendDelayedClient(Greeter, 100).logger("delayed " + message);
ctx
.serviceSendClient(Greeter, { delay: 100 })
.logger("delayed " + message);

return message;
},
Expand Down
102 changes: 43 additions & 59 deletions src/context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,43 @@ export interface KeyValueStore {
clearAll(): void;
}

export interface SendOptions {
/**
* Makes a type-safe one-way RPC to the specified target service, after a delay specified by the
* milliseconds' argument.
* This method is like stetting up a fault-tolerant cron job that enqueues the message in a
* message queue.
* The handler calling this function does not have to stay active for the delay time.
*
* Both the delay timer and the message are durably stored in Restate and guaranteed to be reliably
* delivered. The delivery happens no earlier than specified through the delay, but may happen
* later, if the target service is down, or backpressuring the system.
*
* The delay message is journaled for durable execution and will thus not be duplicated when the
* handler is re-invoked for retries or after suspending.
*
* This call will return immediately; the message sending happens asynchronously in the background.
* Despite that, the message is guaranteed to be sent, because the completion of the invocation that
* triggers the send (calls this function) happens logically after the sending. That means that any
* failure where the message does not reach Restate also cannot complete this invocation, and will
* hence recover this handler and (through the durable execution) recover the message to be sent.
*
* @example
* *Service Side:*
* ```ts
* const service = restate.service({
* ...
* });
*
* ```
* **Client side:**
* ```ts
* ctx.send(service, {delay: 60_000}).anotherAction(1337);
* ```
*/
delay?: number;
}

export interface ContextDate {
/** Returns the number of milliseconds elapsed since midnight, January 1, 1970 Universal Coordinated Time (UTC).
* This is equivalent to Date.now()
Expand Down Expand Up @@ -323,67 +360,14 @@ export interface Context {
* ```
*/
objectSendClient<P extends string, M>(
opts: VirtualObjectDefinition<P, M>,
key: string
): SendClient<M>;
serviceSendClient<P extends string, M>(
opts: ServiceDefinition<P, M>
obj: VirtualObjectDefinition<P, M>,
key: string,
opts?: SendOptions
): SendClient<M>;

/**
* Makes a type-safe one-way RPC to the specified target service, after a delay specified by the
* milliseconds' argument.
* This method is like stetting up a fault-tolerant cron job that enqueues the message in a
* message queue.
* The handler calling this function does not have to stay active for the delay time.
*
* Both the delay timer and the message are durably stored in Restate and guaranteed to be reliably
* delivered. The delivery happens no earlier than specified through the delay, but may happen
* later, if the target service is down, or backpressuring the system.
*
* The delay message is journaled for durable execution and will thus not be duplicated when the
* handler is re-invoked for retries or after suspending.
*
* This call will return immediately; the message sending happens asynchronously in the background.
* Despite that, the message is guaranteed to be sent, because the completion of the invocation that
* triggers the send (calls this function) happens logically after the sending. That means that any
* failure where the message does not reach Restate also cannot complete this invocation, and will
* hence recover this handler and (through the durable execution) recover the message to be sent.
*
* @example
* *Service Side:*
* ```ts
* const router = restate.router({
* someAction: async(ctx: restate.RpcContext, req: string) => { ... },
* anotherAction: async(ctx: restate.RpcContext, count: number) => { ... }
* });
*
* // option 1: export only the type signature of the router
* export type myApiType = typeof router;
*
* // option 2: export the API definition with type and name (name)
* export const myApi: restate.ServiceApi<typeof router> = { name : "myservice" };
*
* restate.createServer().bindRouter("myservice", router).listen(9080);
* ```
* **Client side:**
* ```ts
* // option 1: use only types and supply service name separately
* ctx.sendDelayed<myApiType>({name: "myservice"}, 60_000).someAction("hello!");
*
* // option 2: use full API spec
* ctx.sendDelayed(myApi, 60_000).anotherAction(1337);
* ```
*/
objectSendDelayedClient<P extends string, M>(
opts: VirtualObjectDefinition<P, M>,
delay: number,
key: string
): SendClient<M>;

serviceSendDelayedClient<P extends string, M>(
opts: ServiceDefinition<P, M>,
delay: number
serviceSendClient<P extends string, M>(
service: ServiceDefinition<P, M>,
opts?: SendOptions
): SendClient<M>;

/**
Expand Down
41 changes: 16 additions & 25 deletions src/context_impl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import {
ObjectContext,
Rand,
Request,
SendOptions,
} from "./context";
import { StateMachine } from "./state_machine";
import {
Expand Down Expand Up @@ -313,14 +314,8 @@ export class ContextImpl implements ObjectContext {
}

public serviceSendClient<P extends string, M>(
options: ServiceDefinition<P, M>
): SendClient<M> {
return this.serviceSendDelayedClient(options, 0);
}

public serviceSendDelayedClient<P extends string, M>(
{ name }: ServiceDefinition<P, M>,
delayMillis: number
service: ServiceDefinition<P, M>,
opts?: SendOptions
): SendClient<M> {
const clientProxy = new Proxy(
{},
Expand All @@ -329,11 +324,14 @@ export class ContextImpl implements ObjectContext {
const route = prop as string;
return (...args: unknown[]) => {
const requestBytes = serializeJson(args.shift());
this.invokeOneWay(name, route, requestBytes, delayMillis).catch(
(e) => {
this.stateMachine.handleDanglingPromiseError(e);
}
);
this.invokeOneWay(
service.name,
route,
requestBytes,
opts?.delay
).catch((e) => {
this.stateMachine.handleDanglingPromiseError(e);
});
};
},
}
Expand All @@ -343,16 +341,9 @@ export class ContextImpl implements ObjectContext {
}

public objectSendClient<P extends string, M>(
options: ServiceDefinition<P, M>,
key: string
): SendClient<M> {
return this.objectSendDelayedClient(options, 0, key);
}

public objectSendDelayedClient<P extends string, M>(
{ name }: ServiceDefinition<P, M>,
delayMillis: number,
key: string
obj: ServiceDefinition<P, M>,
key: string,
opts?: SendOptions
): SendClient<M> {
const clientProxy = new Proxy(
{},
Expand All @@ -362,10 +353,10 @@ export class ContextImpl implements ObjectContext {
return (...args: unknown[]) => {
const requestBytes = serializeJson(args.shift());
this.invokeOneWay(
name,
obj.name,
route,
requestBytes,
delayMillis,
opts?.delay,
key
).catch((e) => {
this.stateMachine.handleDanglingPromiseError(e);
Expand Down
31 changes: 9 additions & 22 deletions src/workflows/workflow_wrapper_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
* https://github.com/restatedev/sdk-typescript/blob/main/LICENSE
*/

import { ContextDate, Request } from "../context";
import { ContextDate, Request, SendOptions } from "../context";
import * as restate from "../public_api";
import * as wf from "./workflow";
import * as wss from "./workflow_state_service";
Expand Down Expand Up @@ -176,27 +176,16 @@ class ExclusiveContextImpl<P extends string>
}
objectSendClient<P extends string, M>(
opts: restate.ServiceDefinition<P, M>,
key: string
key: string,
sendOpts?: SendOptions
): restate.SendClient<M> {
return this.ctx.objectSendClient(opts, key);
return this.ctx.objectSendClient(opts, key, sendOpts);
}
serviceSendClient<P extends string, M>(
opts: restate.ServiceDefinition<P, M>
): restate.SendClient<M> {
return this.ctx.serviceSendClient(opts);
}
objectSendDelayedClient<P extends string, M>(
opts: restate.ServiceDefinition<P, M>,
delay: number,
key: string
): restate.SendClient<M> {
return this.ctx.objectSendDelayedClient(opts, delay, key);
}
serviceSendDelayedClient<P extends string, M>(
opts: restate.ServiceDefinition<P, M>,
delay: number
sendOpts?: SendOptions
): restate.SendClient<M> {
return this.ctx.serviceSendDelayedClient(opts, delay);
return this.ctx.serviceSendClient(opts, sendOpts);
}
}

Expand Down Expand Up @@ -251,11 +240,9 @@ export function createWrapperService<P extends string, R, T, M>(
throw err;
} finally {
ctx
.objectSendDelayedClient(
stateServiceApi,
DEFAULT_RETENTION_PERIOD,
request.workflowId
)
.objectSendClient(stateServiceApi, request.workflowId, {
delay: DEFAULT_RETENTION_PERIOD,
})
.dispose();
}
},
Expand Down