Skip to content

Commit 163f62a

Browse files
committed
Return underlying AsyncIterators when execute result is returned (#2843)
1 parent f78d206 commit 163f62a

File tree

2 files changed

+167
-0
lines changed

2 files changed

+167
-0
lines changed

src/execution/__tests__/stream-test.js

Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { expect } from 'chai';
22
import { describe, it } from 'mocha';
33

4+
import invariant from '../../jsutils/invariant';
45
import isAsyncIterable from '../../jsutils/isAsyncIterable';
56
import { parse } from '../../language/parser';
67

@@ -72,6 +73,36 @@ const query = new GraphQLObjectType({
7273
yield {};
7374
},
7475
},
76+
asyncIterableListDelayed: {
77+
type: new GraphQLList(friendType),
78+
async *resolve() {
79+
for (const friend of friends) {
80+
// pause an additional ms before yielding to allow time
81+
// for tests to return or throw before next value is processed.
82+
// eslint-disable-next-line no-await-in-loop
83+
await new Promise((r) => setTimeout(r, 1));
84+
yield friend;
85+
}
86+
},
87+
},
88+
asyncIterableListNoReturn: {
89+
type: new GraphQLList(friendType),
90+
resolve() {
91+
let i = 0;
92+
return {
93+
[Symbol.asyncIterator]: () => ({
94+
async next() {
95+
const friend = friends[i++];
96+
if (friend) {
97+
await new Promise((r) => setTimeout(r, 1));
98+
return { value: friend, done: false };
99+
}
100+
return { value: undefined, done: true };
101+
},
102+
}),
103+
};
104+
},
105+
},
75106
asyncIterableListDelayedClose: {
76107
type: new GraphQLList(friendType),
77108
async *resolve() {
@@ -626,4 +657,114 @@ describe('Execute: stream directive', () => {
626657
},
627658
]);
628659
});
660+
it('Returns underlying async iterables when dispatcher is returned', async () => {
661+
const document = parse(`
662+
query {
663+
asyncIterableListDelayed @stream(initialCount: 1) {
664+
name
665+
id
666+
}
667+
}
668+
`);
669+
const schema = new GraphQLSchema({ query });
670+
671+
const executeResult = await execute(schema, document, {});
672+
invariant(isAsyncIterable(executeResult));
673+
674+
const result1 = await executeResult.next();
675+
expect(result1).to.deep.equal({
676+
done: false,
677+
value: {
678+
data: {
679+
asyncIterableListDelayed: [
680+
{
681+
id: '1',
682+
name: 'Luke',
683+
},
684+
],
685+
},
686+
hasNext: true,
687+
},
688+
});
689+
690+
executeResult.return();
691+
692+
// this result had started processing before return was called
693+
const result2 = await executeResult.next();
694+
expect(result2).to.deep.equal({
695+
done: false,
696+
value: {
697+
data: {
698+
id: '2',
699+
name: 'Han',
700+
},
701+
hasNext: true,
702+
path: ['asyncIterableListDelayed', 1],
703+
},
704+
});
705+
706+
// third result is not returned because async iterator has returned
707+
const result3 = await executeResult.next();
708+
expect(result3).to.deep.equal({
709+
done: false,
710+
value: {
711+
hasNext: false,
712+
},
713+
});
714+
});
715+
it('Can return async iterable when underlying iterable does not have a return method', async () => {
716+
const document = parse(`
717+
query {
718+
asyncIterableListNoReturn @stream(initialCount: 1) {
719+
name
720+
id
721+
}
722+
}
723+
`);
724+
const schema = new GraphQLSchema({ query });
725+
726+
const executeResult = await execute(schema, document, {});
727+
invariant(isAsyncIterable(executeResult));
728+
729+
const result1 = await executeResult.next();
730+
expect(result1).to.deep.equal({
731+
done: false,
732+
value: {
733+
data: {
734+
asyncIterableListNoReturn: [
735+
{
736+
id: '1',
737+
name: 'Luke',
738+
},
739+
],
740+
},
741+
hasNext: true,
742+
},
743+
});
744+
745+
executeResult.return();
746+
747+
// this result had started processing before return was called
748+
const result2 = await executeResult.next();
749+
expect(result2).to.deep.equal({
750+
done: false,
751+
value: {
752+
data: {
753+
id: '2',
754+
name: 'Han',
755+
},
756+
hasNext: true,
757+
path: ['asyncIterableListNoReturn', 1],
758+
},
759+
});
760+
761+
// third result is not returned because async iterator has returned
762+
const result3 = await executeResult.next();
763+
expect(result3).to.deep.equal({
764+
done: false,
765+
value: {
766+
hasNext: false,
767+
},
768+
});
769+
});
629770
});

src/execution/execute.js

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1645,11 +1645,15 @@ type DispatcherResult = {|
16451645
*/
16461646
export class Dispatcher {
16471647
_subsequentPayloads: Array<Promise<IteratorResult<DispatcherResult, void>>>;
1648+
_iterators: Array<AsyncIterator<mixed>>;
1649+
_isDone: boolean;
16481650
_initialResult: ?ExecutionResult;
16491651
_hasReturnedInitialResult: boolean;
16501652

16511653
constructor() {
16521654
this._subsequentPayloads = [];
1655+
this._iterators = [];
1656+
this._isDone = false;
16531657
this._hasReturnedInitialResult = false;
16541658
}
16551659

@@ -1718,13 +1722,16 @@ export class Dispatcher {
17181722
itemType: GraphQLOutputType,
17191723
): void {
17201724
const subsequentPayloads = this._subsequentPayloads;
1725+
const iterators = this._iterators;
1726+
iterators.push(iterator);
17211727
function next(index) {
17221728
const fieldPath = addPath(path, index);
17231729
const patchErrors = [];
17241730
subsequentPayloads.push(
17251731
iterator.next().then(
17261732
({ value: data, done }) => {
17271733
if (done) {
1734+
iterators.splice(iterators.indexOf(iterator), 1);
17281735
return { value: undefined, done: true };
17291736
}
17301737

@@ -1795,6 +1802,14 @@ export class Dispatcher {
17951802
}
17961803

17971804
_race(): Promise<IteratorResult<ExecutionPatchResult, void>> {
1805+
if (this._isDone) {
1806+
return Promise.resolve({
1807+
value: {
1808+
hasNext: false,
1809+
},
1810+
done: false,
1811+
});
1812+
}
17981813
return new Promise((resolve) => {
17991814
this._subsequentPayloads.forEach((promise) => {
18001815
promise.then(() => {
@@ -1851,13 +1866,24 @@ export class Dispatcher {
18511866
return this._race();
18521867
}
18531868

1869+
_return(): Promise<IteratorResult<AsyncExecutionResult, void>> {
1870+
return Promise.all(
1871+
// $FlowFixMe[prop-missing]
1872+
this._iterators.map((iterator) => iterator.return?.()),
1873+
).then(() => {
1874+
this._isDone = true;
1875+
return { value: undefined, done: true };
1876+
});
1877+
}
1878+
18541879
get(initialResult: ExecutionResult): AsyncIterable<AsyncExecutionResult> {
18551880
this._initialResult = initialResult;
18561881
return ({
18571882
[SYMBOL_ASYNC_ITERATOR]() {
18581883
return this;
18591884
},
18601885
next: () => this._next(),
1886+
return: () => this._return(),
18611887
}: any);
18621888
}
18631889
}

0 commit comments

Comments
 (0)