Skip to content

Warn on unfinished handlers and provide workflow.allHandlersFinished() API #1459

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
5fef751
Add allHandlersFinished() and warn on unfinished handlers
dandavison Jul 17, 2024
32a6bb2
Test warning emitted when workflow exits fue to cancellation / failure
dandavison Jul 17, 2024
bc31e99
Revert to doing in-progress state transition in `finally()` callback
dandavison Jul 24, 2024
c6cfef3
Update tests
dandavison Jul 25, 2024
a9b1c3b
Add rule code and edit signal warning
dandavison Jul 29, 2024
cfe9266
just catch ApplicationFailure
dandavison Jul 29, 2024
2587edc
Revert "just catch ApplicationFailure"
dandavison Jul 30, 2024
bc80e24
Edit docstring
dandavison Jul 29, 2024
96f6c6b
Fix test assertion: we do not warn on failure
dandavison Jul 29, 2024
98cff07
Add tests for CAN
dandavison Jul 30, 2024
557fec9
Rename
dandavison Jul 30, 2024
bc368b9
minor
dandavison Jul 31, 2024
e259747
Refactor
dandavison Jul 31, 2024
02b7406
Allow CAN child to return
dandavison Jul 31, 2024
9625c29
Fix containment condition
dandavison Jul 31, 2024
3894c89
Add test of return
dandavison Jul 31, 2024
7529961
Refactor
dandavison Jul 31, 2024
65c3ad6
Revert changes to comment
dandavison Jul 31, 2024
070b576
Fix
dandavison Jul 31, 2024
228d4c3
Edit comments
dandavison Jul 31, 2024
456effc
Mark handlers as completed when failing the workflow
dandavison Aug 2, 2024
f997df1
Add tests that return and CAN behave as expected with allHandlersFini…
dandavison Aug 2, 2024
d27a903
Cleanup
dandavison Aug 2, 2024
147bc6b
Paper over possible server bug
dandavison Aug 2, 2024
a4671f2
Try increasing timeout for update to be admitted
dandavison Aug 5, 2024
cadfbe3
Extend tests to confirm that warnings are issued when handler shields…
dandavison Aug 6, 2024
5d4caca
Add docstring
dandavison Aug 6, 2024
5ce2e7c
Tighten up update handler entry access
dandavison Aug 6, 2024
53a9ba5
Tighten up further
dandavison Aug 6, 2024
580ffa0
Don't use redundant anonymous function in warning message
dandavison Aug 6, 2024
5f78414
Be explicit that there is no warning on workflow failure
dandavison Aug 6, 2024
6fdbd32
Use partial application via bind to clean up further
dandavison Aug 7, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 25 additions & 1 deletion packages/common/src/interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,16 @@ export type WorkflowUpdateType = (...args: any[]) => Promise<any> | any;
export type WorkflowUpdateValidatorType = (...args: any[]) => void;
export type WorkflowUpdateAnnotatedType = {
handler: WorkflowUpdateType;
unfinishedPolicy: HandlerUnfinishedPolicy;
validator?: WorkflowUpdateValidatorType;
description?: string;
};
export type WorkflowSignalType = (...args: any[]) => Promise<void> | void;
export type WorkflowSignalAnnotatedType = { handler: WorkflowSignalType; description?: string };
export type WorkflowSignalAnnotatedType = {
handler: WorkflowSignalType;
unfinishedPolicy: HandlerUnfinishedPolicy;
description?: string;
};
export type WorkflowQueryType = (...args: any[]) => any;
export type WorkflowQueryAnnotatedType = { handler: WorkflowQueryType; description?: string };

Expand Down Expand Up @@ -118,3 +123,22 @@ export interface HistoryAndWorkflowId {
workflowId: string;
history: temporal.api.history.v1.History | unknown | undefined;
}

/**
* Policy defining actions taken when a workflow exits while update or signal handlers are running.
* The workflow exit may be due to successful return, failure, cancellation, or continue-as-new.
*/
export enum HandlerUnfinishedPolicy {
/**
* Issue a warning in addition to abandoning the handler execution. The warning will not be issued if the workflow fails.
*/
WARN_AND_ABANDON = 1,

/**
* Abandon the handler execution.
*
* In the case of an update handler this means that the client will receive an error rather than
* the update result.
*/
ABANDON = 2,
}
32 changes: 30 additions & 2 deletions packages/test/src/helpers-integration.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import { randomUUID } from 'crypto';
import { status as grpcStatus } from '@grpc/grpc-js';
import { ErrorConstructor, ExecutionContext, TestFn } from 'ava';
import {
isGrpcServiceError,
WorkflowFailedError,
WorkflowHandle,
WorkflowStartOptions,
Expand All @@ -12,6 +14,7 @@ import {
} from '@temporalio/testing';
import {
DefaultLogger,
LogEntry,
LogLevel,
Runtime,
WorkerOptions,
Expand Down Expand Up @@ -50,6 +53,7 @@ export function makeTestFunction(opts: {
workflowsPath: string;
workflowEnvironmentOpts?: LocalTestWorkflowEnvironmentOptions;
workflowInterceptorModules?: string[];
recordedLogs?: { [workflowId: string]: LogEntry[] };
}): TestFn<Context> {
const test = anyTest as TestFn<Context>;
test.before(async (t) => {
Expand All @@ -59,9 +63,15 @@ export function makeTestFunction(opts: {
workflowsPath: opts.workflowsPath,
logger: new DefaultLogger('WARN'),
});
// Ignore invalid log levels
const logger = opts.recordedLogs
? new DefaultLogger('DEBUG', (entry) => {
const workflowId = (entry.meta as any)?.workflowInfo?.workflowId ?? (entry.meta as any)?.workflowId;
opts.recordedLogs![workflowId] ??= [];
opts.recordedLogs![workflowId].push(entry);
})
: new DefaultLogger((process.env.TEST_LOG_LEVEL || 'DEBUG').toUpperCase() as LogLevel);
Runtime.install({
logger: new DefaultLogger((process.env.TEST_LOG_LEVEL || 'DEBUG').toUpperCase() as LogLevel),
logger,
telemetryOptions: {
logging: {
filter: makeTelemetryFilterString({
Expand Down Expand Up @@ -107,6 +117,7 @@ export interface Helpers {
): Promise<WorkflowHandle<T>>;
assertWorkflowUpdateFailed(p: Promise<any>, causeConstructor: ErrorConstructor, message?: string): Promise<void>;
assertWorkflowFailedError(p: Promise<any>, causeConstructor: ErrorConstructor, message?: string): Promise<void>;
updateHasBeenAdmitted(handle: WorkflowHandle<workflow.Workflow>, updateId: string): Promise<boolean>;
}

export function helpers(t: ExecutionContext<Context>, testEnv: TestWorkflowEnvironment = t.context.env): Helpers {
Expand Down Expand Up @@ -172,5 +183,22 @@ export function helpers(t: ExecutionContext<Context>, testEnv: TestWorkflowEnvir
t.is(err.cause?.message, message);
}
},
async updateHasBeenAdmitted(handle: WorkflowHandle<workflow.Workflow>, updateId: string): Promise<boolean> {
try {
await testEnv.client.workflowService.pollWorkflowExecutionUpdate({
namespace: testEnv.client.options.namespace,
updateRef: {
workflowExecution: { workflowId: handle.workflowId },
updateId,
},
});
return true;
} catch (err) {
if (isGrpcServiceError(err) && err.code === grpcStatus.NOT_FOUND) {
return false;
}
throw err;
}
},
};
}
17 changes: 17 additions & 0 deletions packages/test/src/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,23 @@ export async function sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}

export async function waitUntil(
condition: () => Promise<boolean>,
timeoutMs: number,
intervalMs: number = 100
): Promise<void> {
const endTime = Date.now() + timeoutMs;
for (;;) {
if (await condition()) {
return;
} else if (Date.now() >= endTime) {
throw new Error('timed out waiting for condition');
} else {
await sleep(intervalMs);
}
}
}

export function cleanOptionalStackTrace(stackTrace: string | undefined | null): string | undefined {
return stackTrace ? cleanStackTrace(stackTrace) : undefined;
}
Expand Down
Loading
Loading