Skip to content

Commit 2de157e

Browse files
committed
Implement support for @stream directive
# Conflicts: # src/execution/execute.ts # src/validation/index.d.ts # src/validation/index.ts
1 parent c57a360 commit 2de157e

File tree

8 files changed

+1219
-17
lines changed

8 files changed

+1219
-17
lines changed

src/execution/__tests__/stream-test.ts

Lines changed: 700 additions & 0 deletions
Large diffs are not rendered by default.

src/execution/execute.ts

Lines changed: 238 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ import {
5151
GraphQLIncludeDirective,
5252
GraphQLSkipDirective,
5353
GraphQLDeferDirective,
54+
GraphQLStreamDirective,
5455
} from '../type/directives';
5556
import {
5657
isObjectType,
@@ -147,7 +148,7 @@ export interface FormattedExecutionResult<
147148
* - `extensions` is reserved for adding non-standard properties.
148149
*/
149150
export interface ExecutionPatchResult<
150-
TData = ObjMap<unknown>,
151+
TData = ObjMap<unknown> | unknown,
151152
TExtensions = ObjMap<unknown>,
152153
> {
153154
errors?: ReadonlyArray<GraphQLError>;
@@ -159,7 +160,7 @@ export interface ExecutionPatchResult<
159160
}
160161

161162
export interface FormattedExecutionPatchResult<
162-
TData = ObjMap<unknown>,
163+
TData = ObjMap<unknown> | unknown,
163164
TExtensions = ObjMap<unknown>,
164165
> {
165166
errors?: ReadonlyArray<GraphQLFormattedError>;
@@ -716,6 +717,44 @@ function getDeferValues(
716717
};
717718
}
718719

720+
/**
721+
* Returns an object containing the @stream arguments if a field should be
722+
* streamed based on the experimental flag, stream directive present and
723+
* not disabled by the "if" argument.
724+
*/
725+
function getStreamValues(
726+
exeContext: ExecutionContext,
727+
fieldNodes: ReadonlyArray<FieldNode>,
728+
):
729+
| undefined
730+
| {
731+
initialCount?: number;
732+
label?: string;
733+
} {
734+
// validation only allows equivalent streams on multiple fields, so it is
735+
// safe to only check the first fieldNode for the stream directive
736+
const stream = getDirectiveValues(
737+
GraphQLStreamDirective,
738+
fieldNodes[0],
739+
exeContext.variableValues,
740+
);
741+
742+
if (!stream) {
743+
return;
744+
}
745+
746+
if (stream.if === false) {
747+
return;
748+
}
749+
750+
return {
751+
initialCount:
752+
// istanbul ignore next (initialCount is required number argument)
753+
typeof stream.initialCount === 'number' ? stream.initialCount : undefined,
754+
label: typeof stream.label === 'string' ? stream.label : undefined,
755+
};
756+
}
757+
719758
/**
720759
* Determines if a fragment is applicable to the given type.
721760
*/
@@ -1004,8 +1043,28 @@ function completeAsyncIteratorValue(
10041043
errors: Array<GraphQLError>,
10051044
): Promise<ReadonlyArray<unknown>> {
10061045
let containsPromise = false;
1046+
const stream = getStreamValues(exeContext, fieldNodes);
10071047
return new Promise<ReadonlyArray<unknown>>((resolve) => {
10081048
function next(index: number, completedResults: Array<unknown>) {
1049+
if (
1050+
stream &&
1051+
typeof stream.initialCount === 'number' &&
1052+
index >= stream.initialCount
1053+
) {
1054+
exeContext.dispatcher.addAsyncIteratorValue(
1055+
index,
1056+
iterator,
1057+
exeContext,
1058+
fieldNodes,
1059+
info,
1060+
itemType,
1061+
path,
1062+
stream.label,
1063+
);
1064+
resolve(completedResults);
1065+
return;
1066+
}
1067+
10091068
const fieldPath = addPath(path, index, undefined);
10101069
iterator.next().then(
10111070
({ value, done }) => {
@@ -1094,15 +1153,37 @@ function completeListValue(
10941153
);
10951154
}
10961155

1156+
const stream = getStreamValues(exeContext, fieldNodes);
1157+
10971158
// This is specified as a simple map, however we're optimizing the path
10981159
// where the list contains no Promises by avoiding creating another Promise.
10991160
let containsPromise = false;
1100-
const completedResults = Array.from(result, (item, index) => {
1161+
const completedResults = [];
1162+
let index = 0;
1163+
for (const item of result) {
11011164
// No need to modify the info object containing the path,
11021165
// since from here on it is not ever accessed by resolver functions.
11031166
const itemPath = addPath(path, index, undefined);
11041167
try {
11051168
let completedItem;
1169+
1170+
if (
1171+
stream &&
1172+
typeof stream.initialCount === 'number' &&
1173+
index >= stream.initialCount
1174+
) {
1175+
exeContext.dispatcher.addValue(
1176+
itemPath,
1177+
item,
1178+
exeContext,
1179+
fieldNodes,
1180+
info,
1181+
itemType,
1182+
stream.label,
1183+
);
1184+
index++;
1185+
continue;
1186+
}
11061187
if (isPromise(item)) {
11071188
completedItem = item.then((resolved) =>
11081189
completeValue(
@@ -1131,21 +1212,25 @@ function completeListValue(
11311212
containsPromise = true;
11321213
// Note: we don't rely on a `catch` method, but we do expect "thenable"
11331214
// to take a second callback for the error case.
1134-
return completedItem.then(undefined, (rawError) => {
1135-
const error = locatedError(
1136-
rawError,
1137-
fieldNodes,
1138-
pathToArray(itemPath),
1139-
);
1140-
return handleFieldError(error, itemType, errors);
1141-
});
1215+
completedResults.push(
1216+
completedItem.then(undefined, (rawError) => {
1217+
const error = locatedError(
1218+
rawError,
1219+
fieldNodes,
1220+
pathToArray(itemPath),
1221+
);
1222+
return handleFieldError(error, itemType, errors);
1223+
}),
1224+
);
1225+
} else {
1226+
completedResults.push(completedItem);
11421227
}
1143-
return completedItem;
11441228
} catch (rawError) {
11451229
const error = locatedError(rawError, fieldNodes, pathToArray(itemPath));
1146-
return handleFieldError(error, itemType, errors);
1230+
completedResults.push(handleFieldError(error, itemType, errors));
11471231
}
1148-
});
1232+
index++;
1233+
}
11491234

11501235
return containsPromise ? Promise.all(completedResults) : completedResults;
11511236
}
@@ -1521,7 +1606,7 @@ export function getFieldDef(
15211606
*/
15221607
interface DispatcherResult {
15231608
errors?: ReadonlyArray<GraphQLError>;
1524-
data?: ObjMap<unknown> | null;
1609+
data?: ObjMap<unknown> | unknown | null;
15251610
path: ReadonlyArray<string | number>;
15261611
label?: string;
15271612
extensions?: ObjMap<unknown>;
@@ -1560,6 +1645,129 @@ export class Dispatcher {
15601645
);
15611646
}
15621647

1648+
addValue(
1649+
path: Path,
1650+
promiseOrData: PromiseOrValue<unknown>,
1651+
exeContext: ExecutionContext,
1652+
fieldNodes: ReadonlyArray<FieldNode>,
1653+
info: GraphQLResolveInfo,
1654+
itemType: GraphQLOutputType,
1655+
label?: string,
1656+
): void {
1657+
const errors: Array<GraphQLError> = [];
1658+
this._subsequentPayloads.push(
1659+
Promise.resolve(promiseOrData)
1660+
.then((resolved) =>
1661+
completeValue(
1662+
exeContext,
1663+
itemType,
1664+
fieldNodes,
1665+
info,
1666+
path,
1667+
resolved,
1668+
errors,
1669+
),
1670+
)
1671+
// Note: we don't rely on a `catch` method, but we do expect "thenable"
1672+
// to take a second callback for the error case.
1673+
.then(undefined, (rawError) => {
1674+
const error = locatedError(rawError, fieldNodes, pathToArray(path));
1675+
return handleFieldError(error, itemType, errors);
1676+
})
1677+
.then((data) => ({
1678+
value: createPatchResult(data, label, path, errors),
1679+
done: false,
1680+
})),
1681+
);
1682+
}
1683+
1684+
addAsyncIteratorValue(
1685+
initialIndex: number,
1686+
iterator: AsyncIterator<unknown>,
1687+
exeContext: ExecutionContext,
1688+
fieldNodes: ReadonlyArray<FieldNode>,
1689+
info: GraphQLResolveInfo,
1690+
itemType: GraphQLOutputType,
1691+
path?: Path,
1692+
label?: string,
1693+
): void {
1694+
const subsequentPayloads = this._subsequentPayloads;
1695+
function next(index: number) {
1696+
const fieldPath = addPath(path, index, undefined);
1697+
const patchErrors: Array<GraphQLError> = [];
1698+
subsequentPayloads.push(
1699+
iterator.next().then(
1700+
({ value: data, done }) => {
1701+
if (done) {
1702+
return { value: undefined, done: true };
1703+
}
1704+
1705+
// eslint-disable-next-line node/callback-return
1706+
next(index + 1);
1707+
1708+
try {
1709+
const completedItem = completeValue(
1710+
exeContext,
1711+
itemType,
1712+
fieldNodes,
1713+
info,
1714+
fieldPath,
1715+
data,
1716+
patchErrors,
1717+
);
1718+
1719+
if (isPromise(completedItem)) {
1720+
return completedItem.then((resolveItem) => ({
1721+
value: createPatchResult(
1722+
resolveItem,
1723+
label,
1724+
fieldPath,
1725+
patchErrors,
1726+
),
1727+
done: false,
1728+
}));
1729+
}
1730+
1731+
return {
1732+
value: createPatchResult(
1733+
completedItem,
1734+
label,
1735+
fieldPath,
1736+
patchErrors,
1737+
),
1738+
done: false,
1739+
};
1740+
} catch (rawError) {
1741+
const error = locatedError(
1742+
rawError,
1743+
fieldNodes,
1744+
pathToArray(fieldPath),
1745+
);
1746+
handleFieldError(error, itemType, patchErrors);
1747+
return {
1748+
value: createPatchResult(null, label, fieldPath, patchErrors),
1749+
done: false,
1750+
};
1751+
}
1752+
},
1753+
(rawError) => {
1754+
const error = locatedError(
1755+
rawError,
1756+
fieldNodes,
1757+
pathToArray(fieldPath),
1758+
);
1759+
handleFieldError(error, itemType, patchErrors);
1760+
return {
1761+
value: createPatchResult(null, label, fieldPath, patchErrors),
1762+
done: false,
1763+
};
1764+
},
1765+
),
1766+
);
1767+
}
1768+
next(initialIndex);
1769+
}
1770+
15631771
_race(): Promise<IteratorResult<ExecutionPatchResult, void>> {
15641772
return new Promise<{
15651773
promise: Promise<IteratorResult<DispatcherResult, void>>;
@@ -1579,7 +1787,20 @@ export class Dispatcher {
15791787
);
15801788
return promise;
15811789
})
1582-
.then(({ value }) => {
1790+
.then(({ value, done }) => {
1791+
if (done && this._subsequentPayloads.length === 0) {
1792+
// async iterable resolver just finished and no more pending payloads
1793+
return {
1794+
value: {
1795+
hasNext: false,
1796+
},
1797+
done: false,
1798+
};
1799+
} else if (done) {
1800+
// async iterable resolver just finished but there are pending payloads
1801+
// return the next one
1802+
return this._race();
1803+
}
15831804
const returnValue: ExecutionPatchResult = {
15841805
...value,
15851806
hasNext: this._subsequentPayloads.length > 0,
@@ -1621,7 +1842,7 @@ export class Dispatcher {
16211842
}
16221843

16231844
function createPatchResult(
1624-
data: ObjMap<unknown> | null,
1845+
data: ObjMap<unknown> | unknown | null,
16251846
label?: string,
16261847
path?: Path,
16271848
errors?: ReadonlyArray<GraphQLError>,

0 commit comments

Comments
 (0)