diff --git a/.github/workflows/integration.yaml b/.github/workflows/integration.yaml index 47381500..a27e6d70 100644 --- a/.github/workflows/integration.yaml +++ b/.github/workflows/integration.yaml @@ -105,9 +105,10 @@ jobs: cache-to: type=gha,mode=max,scope=${{ github.workflow }} - name: Run test tool - uses: restatedev/sdk-test-suite@v2.4 + uses: restatedev/sdk-test-suite@v3.0 with: restateContainerImage: ${{ inputs.restateCommit != '' && 'localhost/restatedev/restate-commit-download:latest' || (inputs.restateImage != '' && inputs.restateImage || 'ghcr.io/restatedev/restate:main') }} serviceContainerImage: "restatedev/typescript-test-services" exclusionsFile: "packages/restate-e2e-services/exclusions.yaml" testArtifactOutput: "sdk-typescript-integration-test-report" + serviceContainerEnvFile: "packages/restate-e2e-services/.env" diff --git a/packages/restate-e2e-services/.env b/packages/restate-e2e-services/.env new file mode 100644 index 00000000..c0ae41a0 --- /dev/null +++ b/packages/restate-e2e-services/.env @@ -0,0 +1 @@ +RESTATE_LOGGING=TRACE \ No newline at end of file diff --git a/packages/restate-e2e-services/exclusions.yaml b/packages/restate-e2e-services/exclusions.yaml index 1e934904..fd4e2839 100644 --- a/packages/restate-e2e-services/exclusions.yaml +++ b/packages/restate-e2e-services/exclusions.yaml @@ -1 +1,26 @@ -exclusions: {} \ No newline at end of file +exclusions: + "alwaysSuspending": + - "dev.restate.sdktesting.tests.Cancellation.cancelFromContext" + - "dev.restate.sdktesting.tests.Combinators.awakeableOrTimeoutUsingAwaitAny" + - "dev.restate.sdktesting.tests.ServiceToServiceCommunication.callWithIdempotencyKey" + - "dev.restate.sdktesting.tests.ServiceToServiceCommunication.oneWayCallWithIdempotencyKey" + "default": + - "dev.restate.sdktesting.tests.Cancellation.cancelFromContext" + - "dev.restate.sdktesting.tests.Combinators.awakeableOrTimeoutUsingAwaitAny" + - "dev.restate.sdktesting.tests.ServiceToServiceCommunication.callWithIdempotencyKey" + - "dev.restate.sdktesting.tests.ServiceToServiceCommunication.oneWayCallWithIdempotencyKey" + "singleThreadSinglePartition": + - "dev.restate.sdktesting.tests.Cancellation.cancelFromContext" + - "dev.restate.sdktesting.tests.Combinators.awakeableOrTimeoutUsingAwaitAny" + - "dev.restate.sdktesting.tests.ServiceToServiceCommunication.callWithIdempotencyKey" + - "dev.restate.sdktesting.tests.ServiceToServiceCommunication.oneWayCallWithIdempotencyKey" + "threeNodes": + - "dev.restate.sdktesting.tests.Cancellation.cancelFromContext" + - "dev.restate.sdktesting.tests.Combinators.awakeableOrTimeoutUsingAwaitAny" + - "dev.restate.sdktesting.tests.ServiceToServiceCommunication.callWithIdempotencyKey" + - "dev.restate.sdktesting.tests.ServiceToServiceCommunication.oneWayCallWithIdempotencyKey" + "threeNodesAlwaysSuspending": + - "dev.restate.sdktesting.tests.Cancellation.cancelFromContext" + - "dev.restate.sdktesting.tests.Combinators.awakeableOrTimeoutUsingAwaitAny" + - "dev.restate.sdktesting.tests.ServiceToServiceCommunication.callWithIdempotencyKey" + - "dev.restate.sdktesting.tests.ServiceToServiceCommunication.oneWayCallWithIdempotencyKey" diff --git a/packages/restate-e2e-services/src/app.ts b/packages/restate-e2e-services/src/app.ts index eec0a742..2d081e1e 100644 --- a/packages/restate-e2e-services/src/app.ts +++ b/packages/restate-e2e-services/src/app.ts @@ -23,6 +23,7 @@ import "./workflow.js"; import "./proxy.js"; import "./test_utils.js"; import "./kill.js"; +import "./virtual_object_command_interpreter.js"; import { REGISTRY } from "./services.js"; diff --git a/packages/restate-e2e-services/src/cancel_test.ts b/packages/restate-e2e-services/src/cancel_test.ts index dde34e27..fbbc0e21 100644 --- a/packages/restate-e2e-services/src/cancel_test.ts +++ b/packages/restate-e2e-services/src/cancel_test.ts @@ -48,9 +48,7 @@ const blockingService = restate.object({ handlers: { async block(ctx: restate.ObjectContext, request: BlockingOperation) { const { id, promise } = ctx.awakeable(); - // DO NOT await the next CALL otherwise the test deadlocks. - // eslint-disable-next-line @typescript-eslint/no-floating-promises - ctx.objectClient(AwakeableHolder, "cancel").hold(id); + await ctx.objectClient(AwakeableHolder, ctx.key).hold(id); await promise; switch (request) { diff --git a/packages/restate-e2e-services/src/kill.ts b/packages/restate-e2e-services/src/kill.ts index 52531564..b7459417 100644 --- a/packages/restate-e2e-services/src/kill.ts +++ b/packages/restate-e2e-services/src/kill.ts @@ -11,11 +11,11 @@ import * as restate from "@restatedev/restate-sdk"; import { REGISTRY } from "./services.js"; import type { AwakeableHolder } from "./awakeable_holder.js"; -const kill = restate.service({ +const kill = restate.object({ name: "KillTestRunner", handlers: { async startCallTree(ctx: restate.ObjectContext) { - await ctx.objectClient(killSingleton, "").recursiveCall(); + await ctx.objectClient(killSingleton, ctx.key).recursiveCall(); }, }, }); @@ -26,11 +26,11 @@ const killSingleton = restate.object({ async recursiveCall(ctx: restate.ObjectContext) { const { id, promise } = ctx.awakeable(); ctx - .objectSendClient({ name: "AwakeableHolder" }, "kill") + .objectSendClient({ name: "AwakeableHolder" }, ctx.key) .hold(id); await promise; - await ctx.objectClient(killSingleton, "").recursiveCall(); + await ctx.objectClient(killSingleton, ctx.key).recursiveCall(); }, isUnlocked() { @@ -39,5 +39,5 @@ const killSingleton = restate.object({ }, }); -REGISTRY.addService(kill); +REGISTRY.addObject(kill); REGISTRY.addObject(killSingleton); diff --git a/packages/restate-e2e-services/src/proxy.ts b/packages/restate-e2e-services/src/proxy.ts index 808f883e..6e9a4456 100644 --- a/packages/restate-e2e-services/src/proxy.ts +++ b/packages/restate-e2e-services/src/proxy.ts @@ -11,6 +11,7 @@ import * as restate from "@restatedev/restate-sdk"; import { REGISTRY } from "./services.js"; +import { TerminalError } from "@restatedev/restate-sdk"; type ProxyRequest = { serviceName: string; @@ -18,6 +19,7 @@ type ProxyRequest = { handlerName: string; message: Array; delayMillis?: number; + idempotencyKey?: string; }; type ManyCallRequest = { @@ -30,6 +32,10 @@ function rawCall( ctx: restate.Context, request: ProxyRequest ): Promise { + // TODO add idempotency key support here + if (request.idempotencyKey != undefined) { + throw new TerminalError("idempotency key not supported yet"); + } return ctx.genericCall({ service: request.serviceName, method: request.handlerName, @@ -40,7 +46,11 @@ function rawCall( }); } -function rawSend(ctx: restate.Context, request: ProxyRequest) { +function rawSend(ctx: restate.Context, request: ProxyRequest): Promise { + // TODO add idempotency key support here + if (request.idempotencyKey != undefined) { + throw new TerminalError("idempotency key not supported yet"); + } ctx.genericSend({ service: request.serviceName, method: request.handlerName, @@ -49,6 +59,8 @@ function rawSend(ctx: restate.Context, request: ProxyRequest) { parameter: new Uint8Array(request.message), delay: request.delayMillis, }); + // TODO this should return the invocation id + return Promise.resolve("unknown"); } const o = restate.service({ @@ -59,7 +71,7 @@ const o = restate.service({ }, async oneWayCall(ctx: restate.Context, request: ProxyRequest) { - rawSend(ctx, request); + return rawSend(ctx, request); }, async manyCalls(ctx: restate.Context, request: ManyCallRequest[]) { @@ -67,7 +79,7 @@ const o = restate.service({ for (const r of request) { if (r.oneWayCall) { - rawSend(ctx, r.proxyRequest); + await rawSend(ctx, r.proxyRequest); continue; } const promise = rawCall(ctx, r.proxyRequest); diff --git a/packages/restate-e2e-services/src/test_utils.ts b/packages/restate-e2e-services/src/test_utils.ts index 517a2227..da2b257c 100644 --- a/packages/restate-e2e-services/src/test_utils.ts +++ b/packages/restate-e2e-services/src/test_utils.ts @@ -10,21 +10,7 @@ import * as restate from "@restatedev/restate-sdk"; import { REGISTRY } from "./services.js"; -import type { AwakeableHolder } from "./awakeable_holder.js"; -import * as process from "node:process"; -import { ListObject } from "./list.js"; - -const AwakeableHolder: AwakeableHolder = { name: "AwakeableHolder" }; -const ListObject: ListObject = { name: "ListObject" }; - -type Command = - | { type: "createAwakeableAndAwaitIt"; awakeableKey: string } - | { type: "getEnvVariable"; envName: string }; - -interface InterpretRequest { - listName: string; - commands: Command[]; -} +import { TerminalError } from "@restatedev/restate-sdk"; const o = restate.service({ name: "TestUtilsService", @@ -51,29 +37,6 @@ const o = restate.service({ } ), - async createAwakeableAndAwaitIt( - ctx: restate.Context, - req: { awakeableKey: string; awaitTimeout?: number } - ): Promise<{ type: "timeout" } | { type: "result"; value: string }> { - const { id, promise } = ctx.awakeable(); - - await ctx.objectClient(AwakeableHolder, req.awakeableKey).hold(id); - - if (!req.awaitTimeout) { - return { type: "result", value: await promise }; - } - - try { - const res = await promise.orTimeout(req.awaitTimeout); - return { type: "result", value: res }; - } catch (e) { - if (e instanceof restate.TimeoutError) { - return { type: "timeout" }; - } - throw e; - } - }, - async sleepConcurrently( ctx: restate.Context, millisDuration: number[] @@ -102,42 +65,16 @@ const o = restate.service({ return invokedSideEffects; }, - async getEnvVariable(ctx: restate.Context, env: string): Promise { - return ctx.run(() => process.env[env] ?? ""); - }, - - async interpretCommands( + async cancelInvocation( + // eslint-disable-next-line @typescript-eslint/no-unused-vars ctx: restate.Context, - req: InterpretRequest + // eslint-disable-next-line @typescript-eslint/no-unused-vars + invocationId: string ): Promise { - const listClient = ctx.objectSendClient(ListObject, req.listName); - - async function createAwakeableAndAwaitIt( - awakeableKey: string - ): Promise { - const { id, promise } = ctx.awakeable(); - await ctx.objectClient(AwakeableHolder, awakeableKey).hold(id); - return promise; - } - - async function getEnvVariable(envName: string): Promise { - return ctx.run(() => process.env[envName] ?? ""); - } - - for (const command of req.commands) { - switch (command.type) { - case "createAwakeableAndAwaitIt": - listClient.append( - await createAwakeableAndAwaitIt(command.awakeableKey) - ); - break; - case "getEnvVariable": - listClient.append(await getEnvVariable(command.envName)); - break; - } - } + // TODO add cancel invocation here! + return Promise.reject(new TerminalError("cancel is not supported yet")); }, }, }); -REGISTRY.addObject(o); +REGISTRY.addService(o); diff --git a/packages/restate-e2e-services/src/virtual_object_command_interpreter.ts b/packages/restate-e2e-services/src/virtual_object_command_interpreter.ts new file mode 100644 index 00000000..e9289421 --- /dev/null +++ b/packages/restate-e2e-services/src/virtual_object_command_interpreter.ts @@ -0,0 +1,247 @@ +// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH +// +// This file is part of the Restate e2e tests, +// which are released under the MIT license. +// +// You can find a copy of the license in file LICENSE in the root +// directory of this repository or package, or at +// https://github.com/restatedev/e2e/blob/main/LICENSE + +import * as restate from "@restatedev/restate-sdk"; +import { REGISTRY } from "./services.js"; + +import * as process from "node:process"; +import type { ObjectContext } from "@restatedev/restate-sdk"; +import { CombineablePromise, TerminalError } from "@restatedev/restate-sdk"; + +type AwaitableCommand = CreateAwakeable | Sleep | RunThrowTerminalException; + +interface CreateAwakeable { + type: "createAwakeable"; + awakeableKey: string; +} + +interface Sleep { + type: "sleep"; + timeoutMillis: number; +} + +interface RunThrowTerminalException { + type: "runThrowTerminalException"; + reason: string; +} + +type Command = + | AwaitAnySuccessful + | AwaitAny + | AwaitOne + | AwaitAwakeableOrTimeout + | ResolveAwakeable + | RejectAwakeable + | GetEnvVariable; + +interface AwaitAnySuccessful { + type: "awaitAnySuccessful"; + commands: AwaitableCommand[]; +} + +interface AwaitAny { + type: "awaitAny"; + commands: AwaitableCommand[]; +} + +interface AwaitOne { + type: "awaitOne"; + command: AwaitableCommand; +} + +interface AwaitAwakeableOrTimeout { + type: "awaitAwakeableOrTimeout"; + awakeableKey: string; + timeoutMillis: number; +} + +interface ResolveAwakeable { + type: "resolveAwakeable"; + awakeableKey: string; + value: string; +} + +interface RejectAwakeable { + type: "rejectAwakeable"; + awakeableKey: string; + reason: string; +} + +interface GetEnvVariable { + type: "getEnvVariable"; + envName: string; +} + +interface InterpretRequest { + commands: Command[]; +} + +function createAwakeable(ctx: ObjectContext, awakeableKey: string) { + const { id, promise } = ctx.awakeable(); + ctx.set(`awk-${awakeableKey}`, id); + return promise; +} + +function parseAwaitableCommand( + ctx: ObjectContext, + command: AwaitableCommand +): CombineablePromise { + switch (command.type) { + case "createAwakeable": + return createAwakeable(ctx, command.awakeableKey); + case "sleep": + // TODO yes this is an incorrect cast, but for now this is fine, type coercion in TS FTW. + // We need a mapper function in our promise type to make this working. + // The kotlin code looks like this: + // ctx.timer(this.timeoutMillis.milliseconds).map { "sleep" } + return ctx.sleep( + command.timeoutMillis + ) as unknown as CombineablePromise; + case "runThrowTerminalException": + return ctx.run(() => { + throw new TerminalError(command.reason); + }); + } +} + +async function awaitAwakeableOrTimeout( + ctx: ObjectContext, + { + awakeableKey, + timeoutMillis, + }: { awakeableKey: string; timeoutMillis: number } +): Promise { + const promise = createAwakeable(ctx, awakeableKey); + try { + return await promise.orTimeout(timeoutMillis); + } catch (e) { + if (e instanceof restate.TimeoutError) { + throw new TerminalError("await-timeout"); + } + throw e; + } +} + +async function getEnvVariable( + ctx: restate.Context, + envName: string +): Promise { + return ctx.run(() => process.env[envName] ?? ""); +} + +async function resolveAwakeable( + ctx: restate.ObjectSharedContext, + { + awakeableKey, + value, + }: { + awakeableKey: string; + value: string; + } +) { + const awkId = await ctx.get(`awk-${awakeableKey}`); + if (awkId === null) { + throw new TerminalError("awakeable is not registered yet"); + } + ctx.resolveAwakeable(awkId, value); +} + +async function rejectAwakeable( + ctx: restate.ObjectSharedContext, + { + awakeableKey, + reason, + }: { + awakeableKey: string; + reason: string; + } +) { + const awkId = await ctx.get(`awk-${awakeableKey}`); + if (awkId === null) { + throw new TerminalError("awakeable is not registered yet"); + } + ctx.rejectAwakeable(awkId, reason); +} + +const virtualObjectCommandInterpreter = restate.object({ + name: "VirtualObjectCommandInterpreter", + handlers: { + interpretCommands: restate.handlers.object.exclusive( + async (ctx: restate.ObjectContext, req: InterpretRequest) => { + let lastResult = ""; + + for (const command of req.commands) { + switch (command.type) { + case "awaitAnySuccessful": + lastResult = await CombineablePromise.any( + command.commands.map((cmd) => parseAwaitableCommand(ctx, cmd)) + ); + break; + case "awaitAny": + lastResult = await CombineablePromise.race( + command.commands.map((cmd) => parseAwaitableCommand(ctx, cmd)) + ); + break; + case "awaitOne": + lastResult = await parseAwaitableCommand(ctx, command.command); + break; + case "awaitAwakeableOrTimeout": + await awaitAwakeableOrTimeout(ctx, { + awakeableKey: command.awakeableKey, + timeoutMillis: command.timeoutMillis, + }); + break; + case "resolveAwakeable": + await resolveAwakeable(ctx, { + awakeableKey: command.awakeableKey, + value: command.value, + }); + lastResult = ""; + break; + case "rejectAwakeable": + await rejectAwakeable(ctx, { + awakeableKey: command.awakeableKey, + reason: command.reason, + }); + lastResult = ""; + break; + case "getEnvVariable": + lastResult = await getEnvVariable(ctx, command.envName); + break; + } + + // Append result + const results = (await ctx.get("results")) ?? []; + results.push(lastResult); + ctx.set("results", results); + } + + return lastResult; + } + ), + + resolveAwakeable: restate.handlers.object.shared(resolveAwakeable), + + rejectAwakeable: restate.handlers.object.shared(rejectAwakeable), + + hasAwakeable: restate.handlers.object.shared( + async (ctx: restate.ObjectSharedContext, awakeableKey: string) => { + return (await ctx.get(`awk-${awakeableKey}`)) !== null; + } + ), + + getResults: restate.handlers.object.shared( + async (ctx: restate.ObjectSharedContext) => { + return (await ctx.get("results")) ?? []; + } + ), + }, +}); + +REGISTRY.addObject(virtualObjectCommandInterpreter);