Skip to content

Commit 2cd36bd

Browse files
committed
Return underlying AsyncIterators when execute result is returned (#2843)
# Conflicts: # src/execution/execute.ts
1 parent 6c75db3 commit 2cd36bd

File tree

2 files changed

+232
-7
lines changed

2 files changed

+232
-7
lines changed

src/execution/__tests__/stream-test.ts

Lines changed: 203 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { describe, it } from 'mocha';
22

33
import { expectJSON } from '../../__testUtils__/expectJSON';
44

5+
import { invariant } from '../../jsutils/invariant';
56
import { isAsyncIterable } from '../../jsutils/isAsyncIterable';
67

78
import type { DocumentNode } from '../../language/ast';
@@ -104,6 +105,37 @@ const query = new GraphQLObjectType({
104105
yield await Promise.resolve({});
105106
},
106107
},
108+
asyncIterableListDelayed: {
109+
type: new GraphQLList(friendType),
110+
async *resolve() {
111+
for (const friend of friends) {
112+
// pause an additional ms before yielding to allow time
113+
// for tests to return or throw before next value is processed.
114+
// eslint-disable-next-line no-await-in-loop
115+
await new Promise((r) => setTimeout(r, 1));
116+
yield friend; /* c8 ignore start */
117+
// Not reachable, early return
118+
}
119+
} /* c8 ignore stop */,
120+
},
121+
asyncIterableListNoReturn: {
122+
type: new GraphQLList(friendType),
123+
resolve() {
124+
let i = 0;
125+
return {
126+
[Symbol.asyncIterator]: () => ({
127+
async next() {
128+
const friend = friends[i++];
129+
if (friend) {
130+
await new Promise((r) => setTimeout(r, 1));
131+
return { value: friend, done: false };
132+
}
133+
return { value: undefined, done: true };
134+
},
135+
}),
136+
};
137+
},
138+
},
107139
asyncIterableListDelayedClose: {
108140
type: new GraphQLList(friendType),
109141
async *resolve() {
@@ -973,4 +1005,175 @@ describe('Execute: stream directive', () => {
9731005
},
9741006
]);
9751007
});
1008+
it('Returns underlying async iterables when dispatcher is returned', async () => {
1009+
const document = parse(`
1010+
query {
1011+
asyncIterableListDelayed @stream(initialCount: 1) {
1012+
name
1013+
id
1014+
}
1015+
}
1016+
`);
1017+
const schema = new GraphQLSchema({ query });
1018+
1019+
const executeResult = await execute({ schema, document, rootValue: {} });
1020+
invariant(isAsyncIterable(executeResult));
1021+
const iterator = executeResult[Symbol.asyncIterator]();
1022+
1023+
const result1 = await iterator.next();
1024+
expectJSON(result1).toDeepEqual({
1025+
done: false,
1026+
value: {
1027+
data: {
1028+
asyncIterableListDelayed: [
1029+
{
1030+
id: '1',
1031+
name: 'Luke',
1032+
},
1033+
],
1034+
},
1035+
hasNext: true,
1036+
},
1037+
});
1038+
1039+
const returnPromise = iterator.return();
1040+
1041+
// this result had started processing before return was called
1042+
const result2 = await iterator.next();
1043+
expectJSON(result2).toDeepEqual({
1044+
done: false,
1045+
value: {
1046+
data: {
1047+
id: '2',
1048+
name: 'Han',
1049+
},
1050+
hasNext: true,
1051+
path: ['asyncIterableListDelayed', 1],
1052+
},
1053+
});
1054+
1055+
// third result is not returned because async iterator has returned
1056+
const result3 = await iterator.next();
1057+
expectJSON(result3).toDeepEqual({
1058+
done: true,
1059+
value: undefined,
1060+
});
1061+
await returnPromise;
1062+
});
1063+
it('Can return async iterable when underlying iterable does not have a return method', async () => {
1064+
const document = parse(`
1065+
query {
1066+
asyncIterableListNoReturn @stream(initialCount: 1) {
1067+
name
1068+
id
1069+
}
1070+
}
1071+
`);
1072+
const schema = new GraphQLSchema({ query });
1073+
1074+
const executeResult = await execute({ schema, document, rootValue: {} });
1075+
invariant(isAsyncIterable(executeResult));
1076+
const iterator = executeResult[Symbol.asyncIterator]();
1077+
1078+
const result1 = await iterator.next();
1079+
expectJSON(result1).toDeepEqual({
1080+
done: false,
1081+
value: {
1082+
data: {
1083+
asyncIterableListNoReturn: [
1084+
{
1085+
id: '1',
1086+
name: 'Luke',
1087+
},
1088+
],
1089+
},
1090+
hasNext: true,
1091+
},
1092+
});
1093+
1094+
const returnPromise = iterator.return();
1095+
1096+
// this result had started processing before return was called
1097+
const result2 = await iterator.next();
1098+
expectJSON(result2).toDeepEqual({
1099+
done: false,
1100+
value: {
1101+
data: {
1102+
id: '2',
1103+
name: 'Han',
1104+
},
1105+
hasNext: true,
1106+
path: ['asyncIterableListNoReturn', 1],
1107+
},
1108+
});
1109+
1110+
// third result is not returned because async iterator has returned
1111+
const result3 = await iterator.next();
1112+
expectJSON(result3).toDeepEqual({
1113+
done: true,
1114+
value: undefined,
1115+
});
1116+
await returnPromise;
1117+
});
1118+
it('Returns underlying async iterables when dispatcher is thrown', async () => {
1119+
const document = parse(`
1120+
query {
1121+
asyncIterableListDelayed @stream(initialCount: 1) {
1122+
name
1123+
id
1124+
}
1125+
}
1126+
`);
1127+
const schema = new GraphQLSchema({ query });
1128+
1129+
const executeResult = await execute({ schema, document, rootValue: {} });
1130+
invariant(isAsyncIterable(executeResult));
1131+
const iterator = executeResult[Symbol.asyncIterator]();
1132+
1133+
const result1 = await iterator.next();
1134+
expectJSON(result1).toDeepEqual({
1135+
done: false,
1136+
value: {
1137+
data: {
1138+
asyncIterableListDelayed: [
1139+
{
1140+
id: '1',
1141+
name: 'Luke',
1142+
},
1143+
],
1144+
},
1145+
hasNext: true,
1146+
},
1147+
});
1148+
1149+
const throwPromise = iterator.throw(new Error('bad'));
1150+
1151+
// this result had started processing before return was called
1152+
const result2 = await iterator.next();
1153+
expectJSON(result2).toDeepEqual({
1154+
done: false,
1155+
value: {
1156+
data: {
1157+
id: '2',
1158+
name: 'Han',
1159+
},
1160+
hasNext: true,
1161+
path: ['asyncIterableListDelayed', 1],
1162+
},
1163+
});
1164+
1165+
// third result is not returned because async iterator has returned
1166+
const result3 = await iterator.next();
1167+
expectJSON(result3).toDeepEqual({
1168+
done: true,
1169+
value: undefined,
1170+
});
1171+
try {
1172+
await throwPromise; /* c8 ignore start */
1173+
// Not reachable, always throws
1174+
/* c8 ignore stop */
1175+
} catch (e) {
1176+
// ignore error
1177+
}
1178+
});
9761179
});

src/execution/execute.ts

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1489,6 +1489,7 @@ function executeStreamIterator(
14891489
const asyncPayloadRecord = new AsyncPayloadRecord({
14901490
label,
14911491
path: fieldPath,
1492+
iterator,
14921493
});
14931494
const dataPromise: Promise<unknown> = iterator
14941495
.next()
@@ -1561,6 +1562,7 @@ function yieldSubsequentPayloads(
15611562
initialResult: ExecutionResult,
15621563
): AsyncGenerator<AsyncExecutionResult, void, void> {
15631564
let _hasReturnedInitialResult = false;
1565+
let isDone = false;
15641566

15651567
function race(): Promise<IteratorResult<AsyncExecutionResult>> {
15661568
if (exeContext.subsequentPayloads.length === 0) {
@@ -1629,17 +1631,31 @@ function yieldSubsequentPayloads(
16291631
},
16301632
done: false,
16311633
});
1632-
} else if (exeContext.subsequentPayloads.length === 0) {
1634+
} else if (exeContext.subsequentPayloads.length === 0 || isDone) {
16331635
return Promise.resolve({ value: undefined, done: true });
16341636
}
16351637
return race();
16361638
},
1637-
// TODO: implement return & throw
1638-
return: /* istanbul ignore next: will be covered in follow up */ () =>
1639-
Promise.resolve({ value: undefined, done: true }),
1640-
throw: /* istanbul ignore next: will be covered in follow up */ (
1639+
async return(): Promise<IteratorResult<AsyncExecutionResult, void>> {
1640+
await Promise.all(
1641+
exeContext.subsequentPayloads.map((asyncPayloadRecord) =>
1642+
asyncPayloadRecord.iterator?.return?.(),
1643+
),
1644+
);
1645+
isDone = true;
1646+
return { value: undefined, done: true };
1647+
},
1648+
async throw(
16411649
error?: unknown,
1642-
) => Promise.reject(error),
1650+
): Promise<IteratorResult<AsyncExecutionResult, void>> {
1651+
await Promise.all(
1652+
exeContext.subsequentPayloads.map((asyncPayloadRecord) =>
1653+
asyncPayloadRecord.iterator?.return?.(),
1654+
),
1655+
);
1656+
isDone = true;
1657+
return Promise.reject(error);
1658+
},
16431659
};
16441660
}
16451661

@@ -1648,10 +1664,16 @@ class AsyncPayloadRecord {
16481664
label?: string;
16491665
path?: Path;
16501666
dataPromise?: Promise<unknown | null | undefined>;
1667+
iterator?: AsyncIterator<unknown>;
16511668
isCompletedIterator?: boolean;
1652-
constructor(opts: { label?: string; path?: Path }) {
1669+
constructor(opts: {
1670+
label?: string;
1671+
path?: Path;
1672+
iterator?: AsyncIterator<unknown>;
1673+
}) {
16531674
this.label = opts.label;
16541675
this.path = opts.path;
1676+
this.iterator = opts.iterator;
16551677
this.errors = [];
16561678
}
16571679

0 commit comments

Comments
 (0)