Skip to content

Commit 1a48acf

Browse files
committed
Add allHandlersFinished() and warn on unfinished handlers
1 parent a6849af commit 1a48acf

File tree

7 files changed

+336
-10
lines changed

7 files changed

+336
-10
lines changed

packages/common/src/interfaces.ts

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,16 @@ export type WorkflowUpdateType = (...args: any[]) => Promise<any> | any;
88
export type WorkflowUpdateValidatorType = (...args: any[]) => void;
99
export type WorkflowUpdateAnnotatedType = {
1010
handler: WorkflowUpdateType;
11+
unfinishedPolicy: HandlerUnfinishedPolicy;
1112
validator?: WorkflowUpdateValidatorType;
1213
description?: string;
1314
};
1415
export type WorkflowSignalType = (...args: any[]) => Promise<void> | void;
15-
export type WorkflowSignalAnnotatedType = { handler: WorkflowSignalType; description?: string };
16+
export type WorkflowSignalAnnotatedType = {
17+
handler: WorkflowSignalType;
18+
unfinishedPolicy: HandlerUnfinishedPolicy;
19+
description?: string;
20+
};
1621
export type WorkflowQueryType = (...args: any[]) => any;
1722
export type WorkflowQueryAnnotatedType = { handler: WorkflowQueryType; description?: string };
1823

@@ -118,3 +123,22 @@ export interface HistoryAndWorkflowId {
118123
workflowId: string;
119124
history: temporal.api.history.v1.History | unknown | undefined;
120125
}
126+
127+
/**
128+
* Policy defining actions taken when a workflow exits while update or signal handlers are running.
129+
* The workflow exit may be due to successful return, failure, cancellation, or continue-as-new.
130+
*/
131+
export enum HandlerUnfinishedPolicy {
132+
/**
133+
* Issue a warning in addition to abandoning the handler execution.
134+
*/
135+
WARN_AND_ABANDON = 1,
136+
137+
/**
138+
* Abandon the handler execution.
139+
*
140+
* In the case of an update handler this means that the client will receive an error rather than
141+
* the update result.
142+
*/
143+
ABANDON = 2,
144+
}

packages/test/src/helpers-integration.ts

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import {
1212
} from '@temporalio/testing';
1313
import {
1414
DefaultLogger,
15+
LogEntry,
1516
LogLevel,
1617
Runtime,
1718
WorkerOptions,
@@ -50,6 +51,7 @@ export function makeTestFunction(opts: {
5051
workflowsPath: string;
5152
workflowEnvironmentOpts?: LocalTestWorkflowEnvironmentOptions;
5253
workflowInterceptorModules?: string[];
54+
recordedLogs?: { [workflowId: string]: LogEntry[] };
5355
}): TestFn<Context> {
5456
const test = anyTest as TestFn<Context>;
5557
test.before(async (t) => {
@@ -58,9 +60,15 @@ export function makeTestFunction(opts: {
5860
workflowInterceptorModules: [...defaultWorkflowInterceptorModules, ...(opts.workflowInterceptorModules ?? [])],
5961
workflowsPath: opts.workflowsPath,
6062
});
61-
// Ignore invalid log levels
63+
const logger = opts.recordedLogs
64+
? new DefaultLogger('DEBUG', (entry) => {
65+
const workflowId = (entry.meta as any)?.workflowInfo?.workflowId ?? (entry.meta as any)?.workflowId;
66+
opts.recordedLogs![workflowId] ??= [];
67+
opts.recordedLogs![workflowId].push(entry);
68+
})
69+
: new DefaultLogger((process.env.TEST_LOG_LEVEL || 'DEBUG').toUpperCase() as LogLevel);
6270
Runtime.install({
63-
logger: new DefaultLogger((process.env.TEST_LOG_LEVEL || 'DEBUG').toUpperCase() as LogLevel),
71+
logger,
6472
telemetryOptions: {
6573
logging: {
6674
filter: makeTelemetryFilterString({
Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
import { ExecutionContext } from 'ava';
2+
import * as workflow from '@temporalio/workflow';
3+
import { HandlerUnfinishedPolicy } from '@temporalio/common';
4+
import { LogEntry } from '@temporalio/worker';
5+
import { Context, helpers, makeTestFunction } from './helpers-integration';
6+
7+
const recordedLogs: { [workflowId: string]: LogEntry[] } = {};
8+
const test = makeTestFunction({
9+
workflowsPath: __filename,
10+
recordedLogs,
11+
});
12+
13+
export const unfinishedHandlersUpdate = workflow.defineUpdate<void>('unfinished-handlers-update');
14+
export const unfinishedHandlersUpdate_ABANDON = workflow.defineUpdate<void>('unfinished-handlers-update-ABANDON');
15+
export const unfinishedHandlersUpdate_WARN_AND_ABANDON = workflow.defineUpdate<void>(
16+
'unfinished-handlers-update-WARN_AND_ABANDON'
17+
);
18+
export const unfinishedHandlersSignal = workflow.defineSignal('unfinished-handlers-signal');
19+
export const unfinishedHandlersSignal_ABANDON = workflow.defineSignal('unfinished-handlers-signal-ABANDON');
20+
export const unfinishedHandlersSignal_WARN_AND_ABANDON = workflow.defineSignal(
21+
'unfinished-handlers-signal-WARN_AND_ABANDON'
22+
);
23+
24+
/**
25+
* A workflow for testing `workflow.allHandlersFinished()` and control of
26+
* warnings by HandlerUnfinishedPolicy.
27+
*/
28+
export async function unfinishedHandlersWorkflow(waitAllHandlersFinished: boolean): Promise<boolean> {
29+
let startedHandler = false;
30+
let handlerMayReturn = false;
31+
let handlerFinished = false;
32+
33+
const doUpdateOrSignal = async (): Promise<void> => {
34+
startedHandler = true;
35+
await workflow.condition(() => handlerMayReturn);
36+
handlerFinished = true;
37+
};
38+
39+
workflow.setHandler(unfinishedHandlersUpdate, doUpdateOrSignal);
40+
workflow.setHandler(unfinishedHandlersUpdate_ABANDON, doUpdateOrSignal, {
41+
unfinishedPolicy: HandlerUnfinishedPolicy.ABANDON,
42+
});
43+
workflow.setHandler(unfinishedHandlersUpdate_WARN_AND_ABANDON, doUpdateOrSignal, {
44+
unfinishedPolicy: HandlerUnfinishedPolicy.WARN_AND_ABANDON,
45+
});
46+
workflow.setHandler(unfinishedHandlersSignal, doUpdateOrSignal);
47+
workflow.setHandler(unfinishedHandlersSignal_ABANDON, doUpdateOrSignal, {
48+
unfinishedPolicy: HandlerUnfinishedPolicy.ABANDON,
49+
});
50+
workflow.setHandler(unfinishedHandlersSignal_WARN_AND_ABANDON, doUpdateOrSignal, {
51+
unfinishedPolicy: HandlerUnfinishedPolicy.WARN_AND_ABANDON,
52+
});
53+
workflow.setDefaultSignalHandler(doUpdateOrSignal);
54+
55+
await workflow.condition(() => startedHandler);
56+
if (waitAllHandlersFinished) {
57+
handlerMayReturn = true;
58+
await workflow.condition(() => workflow.allHandlersFinished());
59+
}
60+
return handlerFinished;
61+
}
62+
63+
test('unfinished update handler', async (t) => {
64+
await new UnfinishedHandlersTest(t, 'update').testWaitAllHandlersFinishedAndUnfinishedHandlersWarning();
65+
});
66+
67+
test('unfinished signal handler', async (t) => {
68+
await new UnfinishedHandlersTest(t, 'signal').testWaitAllHandlersFinishedAndUnfinishedHandlersWarning();
69+
});
70+
71+
class UnfinishedHandlersTest {
72+
constructor(
73+
private readonly t: ExecutionContext<Context>,
74+
private readonly handlerType: 'update' | 'signal'
75+
) {}
76+
77+
async testWaitAllHandlersFinishedAndUnfinishedHandlersWarning() {
78+
// The unfinished handler warning is issued by default,
79+
let [handlerFinished, warning] = await this.getWorkflowResultAndWarning(false);
80+
this.t.false(handlerFinished);
81+
this.t.true(warning);
82+
83+
// and when the workflow sets the unfinished_policy to WARN_AND_ABANDON,
84+
[handlerFinished, warning] = await this.getWorkflowResultAndWarning(
85+
false,
86+
HandlerUnfinishedPolicy.WARN_AND_ABANDON
87+
);
88+
this.t.false(handlerFinished);
89+
this.t.true(warning);
90+
91+
// and when a default (aka dynamic) handler is used
92+
if (this.handlerType == 'signal') {
93+
[handlerFinished, warning] = await this.getWorkflowResultAndWarning(false, undefined, true);
94+
this.t.false(handlerFinished);
95+
this.t.true(warning);
96+
} else {
97+
// default handlers not supported yet for update
98+
// https://github.com/temporalio/sdk-typescript/issues/1460
99+
}
100+
101+
// but not when the workflow waits for handlers to complete,
102+
[handlerFinished, warning] = await this.getWorkflowResultAndWarning(true);
103+
this.t.true(handlerFinished);
104+
this.t.false(warning);
105+
if (false) {
106+
// TODO: make default handlers honor HandlerUnfinishedPolicy
107+
[handlerFinished, warning] = await this.getWorkflowResultAndWarning(true, undefined, true);
108+
this.t.true(handlerFinished);
109+
this.t.false(warning);
110+
}
111+
112+
// nor when the silence-warnings policy is set on the handler.
113+
[handlerFinished, warning] = await this.getWorkflowResultAndWarning(false, HandlerUnfinishedPolicy.ABANDON);
114+
this.t.false(handlerFinished);
115+
this.t.false(warning);
116+
}
117+
118+
/**
119+
* Run workflow and send signal/update. Return two booleans:
120+
* - did the handler complete? (i.e. the workflow return value)
121+
* - was an unfinished handler warning emitted?
122+
*/
123+
async getWorkflowResultAndWarning(
124+
waitAllHandlersFinished: boolean,
125+
unfinishedPolicy?: HandlerUnfinishedPolicy,
126+
useDefaultHandler?: boolean
127+
): Promise<[boolean, boolean]> {
128+
const { createWorker, startWorkflow } = helpers(this.t);
129+
const worker = await createWorker();
130+
return await worker.runUntil(async () => {
131+
const handle = await startWorkflow(unfinishedHandlersWorkflow, { args: [waitAllHandlersFinished] });
132+
let messageType: string;
133+
if (useDefaultHandler) {
134+
messageType = '__no_registered_handler__';
135+
this.t.falsy(unfinishedPolicy); // default handlers do not support setting the unfinished policy
136+
} else {
137+
messageType = `unfinished-handlers-${this.handlerType}`;
138+
if (unfinishedPolicy) {
139+
messageType += '-' + HandlerUnfinishedPolicy[unfinishedPolicy];
140+
}
141+
}
142+
switch (this.handlerType) {
143+
case 'signal':
144+
await handle.signal(messageType);
145+
break;
146+
case 'update':
147+
const executeUpdate = handle.executeUpdate(messageType, { updateId: 'my-update-id' });
148+
if (!waitAllHandlersFinished) {
149+
const err: workflow.WorkflowNotFoundError = (await this.t.throwsAsync(executeUpdate, {
150+
instanceOf: workflow.WorkflowNotFoundError,
151+
})) as workflow.WorkflowNotFoundError;
152+
this.t.is(err.message, 'workflow execution already completed');
153+
} else {
154+
await executeUpdate;
155+
}
156+
break;
157+
}
158+
const handlerFinished = await handle.result();
159+
const unfinishedHandlerWarningEmitted =
160+
recordedLogs[handle.workflowId] &&
161+
recordedLogs[handle.workflowId].findIndex((e) => this.isUnfinishedHandlerWarning(e)) >= 0;
162+
return [handlerFinished, unfinishedHandlerWarningEmitted];
163+
});
164+
}
165+
166+
isUnfinishedHandlerWarning(logEntry: LogEntry): boolean {
167+
return (
168+
logEntry.level === 'WARN' &&
169+
new RegExp(`^Workflow finished while an? ${this.handlerType} handler was still running\.`).test(logEntry.message)
170+
);
171+
}
172+
}

packages/workflow/src/interfaces.ts

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import {
33
RetryPolicy,
44
TemporalFailure,
55
CommonWorkflowOptions,
6+
HandlerUnfinishedPolicy,
67
SearchAttributes,
78
SignalDefinition,
89
UpdateDefinition,
@@ -494,9 +495,13 @@ export type QueryHandlerOptions = { description?: string };
494495
/**
495496
* A description of a signal handler.
496497
*/
497-
export type SignalHandlerOptions = { description?: string };
498+
export type SignalHandlerOptions = { description?: string; unfinishedPolicy?: HandlerUnfinishedPolicy };
498499

499500
/**
500501
* A validator and description of an update handler.
501502
*/
502-
export type UpdateHandlerOptions<Args extends any[]> = { validator?: UpdateValidator<Args>; description?: string };
503+
export type UpdateHandlerOptions<Args extends any[]> = {
504+
validator?: UpdateValidator<Args>;
505+
description?: string;
506+
unfinishedPolicy?: HandlerUnfinishedPolicy;
507+
};

0 commit comments

Comments
 (0)