Skip to content

Commit c4e0b53

Browse files
committed
Different approach + wip on #1421
1 parent 3b2d84a commit c4e0b53

File tree

8 files changed

+153
-183
lines changed

8 files changed

+153
-183
lines changed

packages/test/src/test-interceptors.ts

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,13 @@ if (RUN_INTEGRATION_TESTS) {
282282
workflowId: uuid4(),
283283
})
284284
);
285-
t.deepEqual(events, ['activate: 0', 'concludeActivation: 1', 'activate: 0', 'concludeActivation: 1']);
285+
t.deepEqual(events, [
286+
'activate: 0',
287+
'activate: 1',
288+
'concludeActivation: 1',
289+
'activate: 0',
290+
'activate: 1',
291+
'concludeActivation: 1',
292+
]);
286293
});
287294
}

packages/test/src/test-workflows.ts

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,7 @@ async function createWorkflow(
122122
randomnessSeed: Long.fromInt(1337).toBytes(),
123123
now: startTime,
124124
patches: [],
125+
sdkFlags: [],
125126
showStackTraceSources: true,
126127
})) as VMWorkflow;
127128
return workflow;
@@ -1955,15 +1956,19 @@ test('scopeCancelledWhileWaitingOnExternalWorkflowCancellation', async (t) => {
19551956
test('query not found - successString', async (t) => {
19561957
const { workflowType } = t.context;
19571958
{
1958-
const completion = await activate(
1959+
const completion = await activate(t, makeActivation(undefined, makeStartWorkflowJob(workflowType)));
1960+
compareCompletion(
19591961
t,
1960-
makeActivation(undefined, makeStartWorkflowJob(workflowType), makeQueryWorkflowJob('qid', 'not-found'))
1962+
completion,
1963+
makeSuccess([makeCompleteWorkflowExecution(defaultPayloadConverter.toPayload('success'))])
19611964
);
1965+
}
1966+
{
1967+
const completion = await activate(t, makeActivation(undefined, makeQueryWorkflowJob('qid', 'not-found')));
19621968
compareCompletion(
19631969
t,
19641970
completion,
19651971
makeSuccess([
1966-
makeCompleteWorkflowExecution(defaultPayloadConverter.toPayload('success')),
19671972
makeRespondToQueryCommand({
19681973
queryId: 'qid',
19691974
failed: {
@@ -2097,11 +2102,14 @@ test("Pending promises can't unblock between signals and updates - 1.11.0+ - sig
20972102
compareCompletion(
20982103
t,
20992104
completion,
2100-
makeSuccess([
2101-
{ updateResponse: { protocolInstanceId: '2', accepted: {} } },
2102-
{ updateResponse: { protocolInstanceId: '2', completed: defaultPayloadConverter.toPayload(3) } },
2103-
{ completeWorkflowExecution: { result: defaultPayloadConverter.toPayload(3) } },
2104-
])
2105+
makeSuccess(
2106+
[
2107+
{ updateResponse: { protocolInstanceId: '2', accepted: {} } },
2108+
{ updateResponse: { protocolInstanceId: '2', completed: defaultPayloadConverter.toPayload(3) } },
2109+
{ completeWorkflowExecution: { result: defaultPayloadConverter.toPayload(3) } },
2110+
],
2111+
[SdkFlags.GroupUpdatesJobsWithSignals]
2112+
)
21052113
);
21062114
}
21072115
});

packages/worker/src/utils.ts

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,6 @@ import { IllegalStateError, ParentWorkflowInfo } from '@temporalio/workflow';
33

44
export const MiB = 1024 ** 2;
55

6-
// ts-prune-ignore-next (no idea why ts-prune is complaining all of a sudden)
7-
export function partition<T>(arr: T[], predicate: (x: T) => boolean): [T[], T[]] {
8-
const truthy = Array<T>();
9-
const falsy = Array<T>();
10-
arr.forEach((v) => (predicate(v) ? truthy : falsy).push(v));
11-
return [truthy, falsy];
12-
}
13-
146
export function toMB(bytes: number, fractionDigits = 2): string {
157
return (bytes / 1024 / 1024).toFixed(fractionDigits);
168
}

packages/worker/src/worker.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1262,6 +1262,7 @@ export class Worker {
12621262
randomnessSeed: randomnessSeed.toBytes(),
12631263
now: tsToMs(activation.timestamp),
12641264
patches,
1265+
sdkFlags: activation.availableInternalFlags ?? [],
12651266
showStackTraceSources: this.options.showStackTraceSources,
12661267
});
12671268

packages/worker/src/workflow/vm-shared.ts

Lines changed: 43 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,7 @@ import type { WorkflowInfo, StackTraceFileLocation } from '@temporalio/workflow'
77
import { type SinkCall } from '@temporalio/workflow/lib/sinks';
88
import * as internals from '@temporalio/workflow/lib/worker-interface';
99
import { Activator } from '@temporalio/workflow/lib/internals';
10-
import { assertValidFlag, SdkFlag, SdkFlags } from '@temporalio/workflow/lib/flags';
11-
import { partition } from '../utils';
10+
import { SdkFlags } from '@temporalio/workflow/lib/flags';
1211
import { Workflow } from './interface';
1312
import { WorkflowBundleWithSourceMapAndFilename } from './workflow-worker-thread/input';
1413

@@ -274,7 +273,6 @@ export type WorkflowModule = typeof internals;
274273
* A Workflow implementation using Node.js' built-in `vm` module
275274
*/
276275
export abstract class BaseVMWorkflow implements Workflow {
277-
private readonly knownFlags = new Set<number>();
278276
unhandledRejection: unknown;
279277

280278
constructor(
@@ -289,7 +287,7 @@ export abstract class BaseVMWorkflow implements Workflow {
289287
* Send request to the Workflow runtime's worker-interface
290288
*/
291289
async getAndResetSinkCalls(): Promise<SinkCall[]> {
292-
return this.workflowModule.getAndResetSinkCalls();
290+
return this.activator.getAndResetSinkCalls();
293291
}
294292

295293
/**
@@ -301,48 +299,45 @@ export abstract class BaseVMWorkflow implements Workflow {
301299
public async activate(
302300
activation: coresdk.workflow_activation.IWorkflowActivation
303301
): Promise<coresdk.workflow_completion.IWorkflowActivationCompletion> {
304-
if (this.context === undefined) {
305-
throw new IllegalStateError('Workflow isolate context uninitialized');
306-
}
307-
if (!activation.jobs) {
308-
throw new Error('Expected workflow activation jobs to be defined');
302+
if (this.context === undefined) throw new IllegalStateError('Workflow isolate context uninitialized');
303+
if (!activation.jobs) throw new Error('Expected workflow activation jobs to be defined');
304+
305+
// Queries are particular in many ways; handle them separately
306+
const [queries, nonQueries] = partition(activation.jobs, ({ queryWorkflow }) => queryWorkflow != null);
307+
if (queries.length > 0) {
308+
// Core guarantees that a single activation will not contain both queries and other jobs
309+
if (nonQueries.length > 0) throw new TypeError('Got both queries and other jobs in a single activation');
310+
return this.activateQueries(activation);
309311
}
310312

311-
this.addKnownFlags(activation.availableInternalFlags ?? []);
313+
// Let's prepare the activation
314+
const [patches, nonPatches] = partition(nonQueries, ({ notifyHasPatch }) => notifyHasPatch != null);
315+
this.workflowModule.startActivation(
316+
coresdk.workflow_activation.WorkflowActivation.fromObject({ ...activation, jobs: patches })
317+
);
312318

319+
// Extract signals and updates
313320
const hasUpdates = activation.jobs.some(({ doUpdate }) => doUpdate != null);
314-
const groupUpdatesWithSignals =
315-
hasUpdates && this.hasFlag(SdkFlags.GroupUpdatesJobsWithSignals, activation.isReplaying ?? false);
316-
317-
// Job processing order
318-
// 1. patch notifications
319-
// 2. signals and updates (updates )
320-
// 3. anything left except for queries
321-
// 4. queries
322-
const [patches, nonPatches] = partition(activation.jobs, ({ notifyHasPatch }) => notifyHasPatch != null);
323-
// const [signals, nonSignals] = partition(nonPatches, ({ signalWorkflow }) => signalWorkflow != null);
324-
const [signals, nonSignals] = partition(
321+
const groupUpdatesWithSignals = hasUpdates && this.activator.hasFlag(SdkFlags.GroupUpdatesJobsWithSignals);
322+
323+
const [signals, rest] = partition(
325324
nonPatches,
326325
({ signalWorkflow, doUpdate }) => signalWorkflow != null || (doUpdate != null && groupUpdatesWithSignals)
327326
);
328-
const [queries, rest] = partition(nonSignals, ({ queryWorkflow }) => queryWorkflow != null);
329-
let batchIndex = 0;
330-
331-
// Loop and invoke each batch and wait for microtasks to complete.
332-
// This is done outside of the isolate because when we used isolated-vm we couldn't wait for microtasks from inside the isolate, not relevant anymore.
333-
for (const jobs of [patches, signals, rest, queries]) {
334-
if (jobs.length === 0) {
335-
continue;
336-
}
327+
328+
// Loop and invoke each batch, waiting for microtasks to complete after each batch.
329+
let batchIndex = 1;
330+
for (const jobs of [signals, rest]) {
331+
if (jobs.length === 0) continue;
337332
this.workflowModule.activate(
338333
coresdk.workflow_activation.WorkflowActivation.fromObject({ ...activation, jobs }),
339334
batchIndex++
340335
);
341-
if (internals.shouldUnblockConditions(jobs[0])) {
342-
this.tryUnblockConditions();
343-
}
336+
this.tryUnblockMicrotasks();
344337
}
338+
345339
const completion = this.workflowModule.concludeActivation();
340+
346341
// Give unhandledRejection handler a chance to be triggered.
347342
await new Promise(setImmediate);
348343
if (this.unhandledRejection) {
@@ -354,6 +349,13 @@ export abstract class BaseVMWorkflow implements Workflow {
354349
return completion;
355350
}
356351

352+
private activateQueries(
353+
activation: coresdk.workflow_activation.IWorkflowActivation
354+
): coresdk.workflow_completion.IWorkflowActivationCompletion {
355+
this.workflowModule.activateQueries(activation);
356+
return this.workflowModule.concludeActivation();
357+
}
358+
357359
/**
358360
* If called (by an external unhandledRejection handler), activations will fail with provided error.
359361
*/
@@ -362,12 +364,12 @@ export abstract class BaseVMWorkflow implements Workflow {
362364
}
363365

364366
/**
365-
* Call into the Workflow context to attempt to unblock any blocked conditions.
367+
* Call into the Workflow context to attempt to unblock any blocked conditions and microtasks.
366368
*
367369
* This is performed in a loop allowing microtasks to be processed between
368370
* each iteration until there are no more conditions to unblock.
369371
*/
370-
protected tryUnblockConditions(): void {
372+
protected tryUnblockMicrotasks(): void {
371373
for (;;) {
372374
const numUnblocked = this.workflowModule.tryUnblockConditions();
373375
if (numUnblocked === 0) break;
@@ -378,34 +380,11 @@ export abstract class BaseVMWorkflow implements Workflow {
378380
* Do not use this Workflow instance after this method has been called.
379381
*/
380382
public abstract dispose(): Promise<void>;
383+
}
381384

382-
/**
383-
* Called early while handling an activation to register known flags
384-
*
385-
* This duplicates the functionality of `Activator.addKnownFlags`, for use outside of the sandbox.
386-
*/
387-
private addKnownFlags(flags: number[]): void {
388-
for (const flag of flags) {
389-
assertValidFlag(flag);
390-
this.knownFlags.add(flag);
391-
}
392-
}
393-
394-
/**
395-
* Check if a flag is known to the Workflow Execution; if not, enable the flag if workflow
396-
* is not replaying and the flag is configured to be enabled by default.
397-
*
398-
* This duplicates the functionality of `Activator.addKnownFlags`, for use outside of the sandbox.
399-
* Note that we can't rely on WorkflowInfo.isReplaying here.
400-
*/
401-
protected hasFlag(flag: SdkFlag, isReplaying: boolean): boolean {
402-
if (this.knownFlags.has(flag.id)) {
403-
return true;
404-
}
405-
if (!isReplaying && flag.default) {
406-
this.knownFlags.add(flag.id);
407-
return true;
408-
}
409-
return false;
410-
}
385+
function partition<T>(arr: T[], predicate: (x: T) => boolean): [T[], T[]] {
386+
const truthy = Array<T>();
387+
const falsy = Array<T>();
388+
arr.forEach((v) => (predicate(v) ? truthy : falsy).push(v));
389+
return [truthy, falsy];
411390
}

packages/workflow/src/interfaces.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -453,6 +453,7 @@ export interface WorkflowCreateOptions {
453453
randomnessSeed: number[];
454454
now: number;
455455
patches: string[];
456+
sdkFlags: number[];
456457
showStackTraceSources: boolean;
457458
}
458459

packages/workflow/src/internals.ts

Lines changed: 17 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -129,15 +129,6 @@ export class Activator implements ActivationHandler {
129129
*/
130130
readonly bufferedSignals = Array<coresdk.workflow_activation.ISignalWorkflow>();
131131

132-
/**
133-
* Holds buffered query calls until a handler is registered.
134-
*
135-
* **IMPORTANT** queries are only buffered until workflow is started.
136-
* This is required because async interceptors might block workflow function invocation
137-
* which delays query handler registration.
138-
*/
139-
protected readonly bufferedQueries = Array<coresdk.workflow_activation.IQueryWorkflow>();
140-
141132
/**
142133
* Mapping of update name to handler and validator
143134
*/
@@ -292,13 +283,6 @@ export class Activator implements ActivationHandler {
292283
*/
293284
protected cancelled = false;
294285

295-
/**
296-
* This is tracked to allow buffering queries until a workflow function is called.
297-
* TODO(bergundy): I don't think this makes sense since queries run last in an activation and must be responded to in
298-
* the same activation.
299-
*/
300-
protected workflowFunctionWasCalled = false;
301-
302286
/**
303287
* The next (incremental) sequence to assign when generating completable commands
304288
*/
@@ -366,6 +350,7 @@ export class Activator implements ActivationHandler {
366350
showStackTraceSources,
367351
sourceMap,
368352
getTimeOfDay,
353+
sdkFlags,
369354
randomnessSeed,
370355
patches,
371356
registeredActivityNames,
@@ -378,11 +363,8 @@ export class Activator implements ActivationHandler {
378363
this.random = alea(randomnessSeed);
379364
this.registeredActivityNames = registeredActivityNames;
380365

381-
if (info.unsafe.isReplaying) {
382-
for (const patchId of patches) {
383-
this.notifyHasPatch({ patchId });
384-
}
385-
}
366+
this.addKnownPatches(patches.map((patch) => ({ patchId: patch })));
367+
this.addKnownFlags(sdkFlags);
386368
}
387369

388370
mutateWorkflowInfo(fn: (info: WorkflowInfo) => WorkflowInfo): void {
@@ -443,19 +425,7 @@ export class Activator implements ActivationHandler {
443425
if (workflow === undefined) {
444426
throw new IllegalStateError('Workflow uninitialized');
445427
}
446-
let promise: Promise<any>;
447-
try {
448-
promise = workflow(...args);
449-
} finally {
450-
// Queries must be handled even if there was an exception when invoking the Workflow function.
451-
this.workflowFunctionWasCalled = true;
452-
// Empty the buffer
453-
const buffer = this.bufferedQueries.splice(0);
454-
for (const activation of buffer) {
455-
this.queryWorkflow(activation);
456-
}
457-
}
458-
return await promise;
428+
return await workflow(...args);
459429
}
460430

461431
public startWorkflow(activation: coresdk.workflow_activation.IStartWorkflow): void {
@@ -586,11 +556,6 @@ export class Activator implements ActivationHandler {
586556
}
587557

588558
public queryWorkflow(activation: coresdk.workflow_activation.IQueryWorkflow): void {
589-
if (!this.workflowFunctionWasCalled) {
590-
this.bufferedQueries.push(activation);
591-
return;
592-
}
593-
594559
const { queryType, queryId, headers } = activation;
595560
if (!(queryType && queryId)) {
596561
throw new TypeError('Missing query activation attributes');
@@ -845,12 +810,22 @@ export class Activator implements ActivationHandler {
845810
}
846811

847812
public notifyHasPatch(activation: coresdk.workflow_activation.INotifyHasPatch): void {
848-
if (!activation.patchId) {
849-
throw new TypeError('Notify has patch missing patch name');
850-
}
813+
// Technically, notifyHasPatch jobs should not reach here, but just for the sake of completeness
814+
if (!this.info.unsafe.isReplaying)
815+
throw new IllegalStateError('Unexpected notifyHasPatch job on non-replay activation');
816+
if (!activation.patchId) throw new TypeError('notifyHasPatch missing patch id');
851817
this.knownPresentPatches.add(activation.patchId);
852818
}
853819

820+
public addKnownPatches(patches: coresdk.workflow_activation.INotifyHasPatch[]): void {
821+
if (patches.length > 0 && !this.info.unsafe.isReplaying)
822+
throw new IllegalStateError('Unexpected notifyHasPatch job on non-replay activation');
823+
for (const patch of patches) {
824+
if (!patch.patchId) throw new TypeError('notifyHasPatch missing patch id');
825+
this.knownPresentPatches.add(patch.patchId);
826+
}
827+
}
828+
854829
public patchInternal(patchId: string, deprecated: boolean): boolean {
855830
if (this.workflow === undefined) {
856831
throw new IllegalStateError('Patches cannot be used before Workflow starts');

0 commit comments

Comments
 (0)