Skip to content

Commit f7cddea

Browse files
committed
Test warning emitted when workflow exits fue to cancellation / failure
1 parent 8da7e30 commit f7cddea

File tree

3 files changed

+164
-1
lines changed

3 files changed

+164
-1
lines changed

packages/test/src/helpers-integration.ts

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
import { randomUUID } from 'crypto';
2+
import { status as grpcStatus } from '@grpc/grpc-js';
23
import { ErrorConstructor, ExecutionContext, TestFn } from 'ava';
34
import {
5+
isGrpcServiceError,
46
WorkflowFailedError,
57
WorkflowHandle,
68
WorkflowStartOptions,
@@ -114,6 +116,7 @@ export interface Helpers {
114116
): Promise<WorkflowHandle<T>>;
115117
assertWorkflowUpdateFailed(p: Promise<any>, causeConstructor: ErrorConstructor, message?: string): Promise<void>;
116118
assertWorkflowFailedError(p: Promise<any>, causeConstructor: ErrorConstructor, message?: string): Promise<void>;
119+
updateHasBeenAdmitted(handle: WorkflowHandle<workflow.Workflow>, updateId: string): Promise<boolean>;
117120
}
118121

119122
export function helpers(t: ExecutionContext<Context>, testEnv: TestWorkflowEnvironment = t.context.env): Helpers {
@@ -179,5 +182,22 @@ export function helpers(t: ExecutionContext<Context>, testEnv: TestWorkflowEnvir
179182
t.is(err.cause?.message, message);
180183
}
181184
},
185+
async updateHasBeenAdmitted(handle: WorkflowHandle<workflow.Workflow>, updateId: string): Promise<boolean> {
186+
try {
187+
await testEnv.client.workflowService.pollWorkflowExecutionUpdate({
188+
namespace: testEnv.client.options.namespace,
189+
updateRef: {
190+
workflowExecution: { workflowId: handle.workflowId },
191+
updateId,
192+
},
193+
});
194+
return true;
195+
} catch (err) {
196+
if (isGrpcServiceError(err) && err.code === grpcStatus.NOT_FOUND) {
197+
return false;
198+
}
199+
throw err;
200+
}
201+
},
182202
};
183203
}

packages/test/src/helpers.ts

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,23 @@ export async function sleep(ms: number): Promise<void> {
4444
return new Promise((resolve) => setTimeout(resolve, ms));
4545
}
4646

47+
export async function waitUntil(
48+
condition: () => Promise<boolean>,
49+
timeoutMs: number,
50+
intervalMs: number = 100
51+
): Promise<void> {
52+
const endTime = Date.now() + timeoutMs;
53+
for (;;) {
54+
if (await condition()) {
55+
return;
56+
} else if (Date.now() >= endTime) {
57+
throw new Error('timed out waiting for condition');
58+
} else {
59+
await sleep(intervalMs);
60+
}
61+
}
62+
}
63+
4764
export function cleanOptionalStackTrace(stackTrace: string | undefined | null): string | undefined {
4865
return stackTrace ? cleanStackTrace(stackTrace) : undefined;
4966
}

packages/test/src/test-integration-workflows-with-recorded-logs.ts

Lines changed: 127 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
import { ExecutionContext } from 'ava';
22
import * as workflow from '@temporalio/workflow';
3-
import { HandlerUnfinishedPolicy } from '@temporalio/common';
3+
import { HandlerUnfinishedPolicy, WorkflowNotFoundError } from '@temporalio/common';
44
import { LogEntry } from '@temporalio/worker';
5+
import { WorkflowFailedError, WorkflowUpdateFailedError } from '@temporalio/client';
56
import { Context, helpers, makeTestFunction } from './helpers-integration';
7+
import { waitUntil } from './helpers';
68

79
const recordedLogs: { [workflowId: string]: LogEntry[] } = {};
810
const test = makeTestFunction({
@@ -170,3 +172,127 @@ class UnfinishedHandlersTest {
170172
);
171173
}
172174
}
175+
176+
export const unfinishedHandlersWithCancellationOrFailureUpdate = workflow.defineUpdate<void>(
177+
'unfinishedHandlersWithCancellationOrFailureUpdate'
178+
);
179+
export const unfinishedHandlersWithCancellationOrFailureSignal = workflow.defineSignal(
180+
'unfinishedHandlersWithCancellationOrFailureSignal'
181+
);
182+
183+
export async function runUnfinishedHandlersWithCancellationOrFailureWorkflow(
184+
workflowTerminationType: 'cancellation' | 'failure'
185+
): Promise<never> {
186+
workflow.setHandler(unfinishedHandlersWithCancellationOrFailureUpdate, async () => {
187+
await workflow.condition(() => false);
188+
throw new Error('unreachable');
189+
});
190+
191+
workflow.setHandler(unfinishedHandlersWithCancellationOrFailureSignal, async () => {
192+
await workflow.condition(() => false);
193+
throw new Error('unreachable');
194+
});
195+
196+
if (workflowTerminationType === 'failure') {
197+
throw new workflow.ApplicationFailure('Deliberately failing workflow with an unfinished handler');
198+
}
199+
await workflow.condition(() => false);
200+
throw new Error('unreachable');
201+
}
202+
203+
test('unfinished update handler with workflow cancellation', async (t) => {
204+
await new UnfinishedHandlersWithCancellationOrFailureTest(t, 'update', 'cancellation').testWarningIsIssued();
205+
});
206+
207+
test('unfinished signal handler with workflow cancellation', async (t) => {
208+
await new UnfinishedHandlersWithCancellationOrFailureTest(t, 'signal', 'cancellation').testWarningIsIssued();
209+
});
210+
211+
test('unfinished update handler with workflow failure', async (t) => {
212+
await new UnfinishedHandlersWithCancellationOrFailureTest(t, 'update', 'failure').testWarningIsIssued();
213+
});
214+
215+
test('unfinished signal handler with workflow failure', async (t) => {
216+
await new UnfinishedHandlersWithCancellationOrFailureTest(t, 'signal', 'failure').testWarningIsIssued();
217+
});
218+
219+
class UnfinishedHandlersWithCancellationOrFailureTest {
220+
constructor(
221+
private readonly t: ExecutionContext<Context>,
222+
private readonly handlerType: 'update' | 'signal',
223+
private readonly workflowTerminationType: 'cancellation' | 'failure'
224+
) {}
225+
226+
async testWarningIsIssued() {
227+
this.t.true(await this.runWorkflowAndGetWarning());
228+
}
229+
230+
async runWorkflowAndGetWarning(): Promise<boolean> {
231+
const { createWorker, startWorkflow, updateHasBeenAdmitted: workflowUpdateExists } = helpers(this.t);
232+
233+
// We require a startWorkflow, an update, and maybe a cancellation request,
234+
// to be delivered in the same WFT. To do this we start the worker after
235+
// they've all been accepted by the server.
236+
const updateId = 'update-id';
237+
238+
const handle = await startWorkflow(runUnfinishedHandlersWithCancellationOrFailureWorkflow, {
239+
args: [this.workflowTerminationType],
240+
});
241+
if (this.workflowTerminationType === 'cancellation') {
242+
await handle.cancel();
243+
}
244+
let executeUpdate: Promise<void>;
245+
246+
switch (this.handlerType) {
247+
case 'update':
248+
executeUpdate = handle.executeUpdate(unfinishedHandlersWithCancellationOrFailureUpdate, { updateId });
249+
await waitUntil(() => workflowUpdateExists(handle, updateId), 500);
250+
break;
251+
case 'signal':
252+
await handle.signal(unfinishedHandlersWithCancellationOrFailureSignal);
253+
break;
254+
}
255+
256+
const worker = await createWorker();
257+
return await worker.runUntil(async () => {
258+
if (this.handlerType === 'update') {
259+
switch (this.workflowTerminationType) {
260+
case 'cancellation': {
261+
const err: WorkflowUpdateFailedError = (await this.t.throwsAsync(executeUpdate, {
262+
instanceOf: WorkflowUpdateFailedError,
263+
})) as WorkflowUpdateFailedError;
264+
this.t.is(err.message, 'Workflow Update failed');
265+
break;
266+
}
267+
case 'failure': {
268+
const err: WorkflowNotFoundError = (await this.t.throwsAsync(executeUpdate, {
269+
instanceOf: WorkflowNotFoundError,
270+
})) as WorkflowNotFoundError;
271+
this.t.is(err.message, 'workflow execution already completed');
272+
break;
273+
}
274+
}
275+
}
276+
277+
const err = (await this.t.throwsAsync(handle.result(), {
278+
instanceOf: WorkflowFailedError,
279+
})) as WorkflowFailedError;
280+
this.t.is(
281+
err.message,
282+
'Workflow execution ' + (this.workflowTerminationType === 'cancellation' ? 'cancelled' : 'failed')
283+
);
284+
285+
const unfinishedHandlerWarningEmitted =
286+
recordedLogs[handle.workflowId] &&
287+
recordedLogs[handle.workflowId].findIndex((e) => this.isUnfinishedHandlerWarning(e)) >= 0;
288+
return unfinishedHandlerWarningEmitted;
289+
});
290+
}
291+
292+
isUnfinishedHandlerWarning(logEntry: LogEntry): boolean {
293+
return (
294+
logEntry.level === 'WARN' &&
295+
new RegExp(`^Workflow finished while an? ${this.handlerType} handler was still running\\.`).test(logEntry.message)
296+
);
297+
}
298+
}

0 commit comments

Comments
 (0)