Skip to content

Commit dcecd1a

Browse files
thomasballingerConvex, Inc.
authored andcommitted
Don't crash when optimistic updates exist on queries for which no server result exists yet (#38020)
Fix a bug with setting optimistic updates for recently- or not-yet-subscribed-to queries. Previously the client threw an error and aborted handling a server update with "Cannot read properties of undefined (reading 'result')" in `BaseConvexClient.notifyOnQueryResultChanges` (among other more obfuscated frames) when the client set an optimistic update for a query for which it did not yet have a value from the server. GitOrigin-RevId: 1fb59b02d5f14750a4d8bdf8c1bb259c4dfdccb5
1 parent a45f15d commit dcecd1a

File tree

5 files changed

+255
-9
lines changed

5 files changed

+255
-9
lines changed

src/browser/sync/client.ts

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ export type QueryModification =
185185
* Object describing a transition passed into the `onTransition` handler.
186186
*
187187
* These can be from receiving a transition from the server, or from applying an
188-
* optimistc update locally.
188+
* optimistic update locally.
189189
*
190190
* @public
191191
*/
@@ -510,20 +510,28 @@ export class BaseConvexClient {
510510
queryTokenToValue.set(queryToken, query);
511511
}
512512
}
513+
514+
// Query tokens that are new (because of new server results or new local optimistic updates)
515+
// or differ from old values (because of changes from local optimistic updates or new results
516+
// from the server).
513517
const changedQueryTokens =
514518
this.optimisticQueryResults.ingestQueryResultsFromServer(
515519
queryTokenToValue,
516520
new Set(completedRequests.keys()),
517521
);
518522

519523
this.handleTransition({
520-
queries: changedQueryTokens.map((token) => ({
521-
token,
522-
modification: {
523-
kind: "Updated",
524-
result: queryTokenToValue.get(token)!.result,
525-
},
526-
})),
524+
queries: changedQueryTokens.map((token) => {
525+
const optimisticResult =
526+
this.optimisticQueryResults.rawQueryResult(token);
527+
return {
528+
token,
529+
modification: {
530+
kind: "Updated",
531+
result: optimisticResult!.result,
532+
},
533+
};
534+
}),
527535
reflectedMutations: Array.from(completedRequests).map(
528536
([requestId, result]) => ({
529537
requestId,

src/browser/sync/client_node.test.ts

Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,10 @@ import {
1414
import {
1515
encodeServerMessage,
1616
nodeWebSocket,
17+
UpdateQueue,
1718
withInMemoryWebSocket,
1819
} from "./client_node_test_helpers.js";
20+
import { FunctionArgs, makeFunctionReference } from "../../server/index.js";
1921

2022
test("BaseConvexClient protocol in node", async () => {
2123
await withInMemoryWebSocket(async ({ address, receive }) => {
@@ -190,3 +192,175 @@ test("maxObservedTimestamp is updated on mutation and transition", async () => {
190192
await client.close();
191193
});
192194
});
195+
196+
const apiQueriesA = makeFunctionReference<"query", {}, string>("queries:a");
197+
const apiQueriesB = makeFunctionReference<"query", {}, string>("queries:b");
198+
199+
const _apiMutationsZ = makeFunctionReference<"mutation", {}>("mutations:z");
200+
201+
/**
202+
* Regression test for
203+
* - subscribing to query a
204+
* - running a mutation that sets an optimistic update for queries a and b
205+
* - receiving an update for a
206+
*/
207+
test("Setting optimistic updates for queries that have not yet been subscribed to", async () => {
208+
await withInMemoryWebSocket(async ({ address, receive, send }) => {
209+
const q = new UpdateQueue(10);
210+
211+
const client = new BaseConvexClient(
212+
address,
213+
(queryTokens) => {
214+
q.onTransition(client)(queryTokens);
215+
},
216+
{
217+
webSocketConstructor: nodeWebSocket,
218+
unsavedChangesWarning: false,
219+
verbose: true,
220+
},
221+
);
222+
223+
client.subscribe("queries:a", {});
224+
225+
expect((await receive()).type).toEqual("Connect");
226+
const modify = await receive();
227+
228+
expect(modify.type).toEqual("ModifyQuerySet");
229+
if (modify.type !== "ModifyQuerySet") {
230+
return;
231+
}
232+
expect(modify.modifications.length).toBe(1);
233+
expect(modify.modifications).toEqual([
234+
{
235+
type: "Add",
236+
queryId: 0,
237+
udfPath: "queries:a",
238+
args: [{}],
239+
},
240+
]);
241+
242+
// Now that we're subscribed to queries:a,
243+
// run a mutation that, optimistically and on the server,
244+
// - modifies q1
245+
// - modifies q2
246+
247+
const mutP = client.mutation(
248+
"mutations:z",
249+
{},
250+
{
251+
optimisticUpdate: (
252+
localStore,
253+
_args: FunctionArgs<typeof _apiMutationsZ>,
254+
) => {
255+
const curA = localStore.getQuery(apiQueriesA, {});
256+
localStore.setQuery(
257+
apiQueriesA,
258+
{},
259+
curA === undefined ? "a local" : `${curA} with a local applied`,
260+
);
261+
const curB = localStore.getQuery(apiQueriesB, {});
262+
localStore.setQuery(
263+
apiQueriesB,
264+
{},
265+
curB === undefined ? "b local" : `${curB} with b local applied`,
266+
);
267+
},
268+
},
269+
);
270+
271+
// Synchronously, the local store should update and the changes should be broadcast.
272+
expect(client.localQueryResult("queries:a", {})).toEqual("a local");
273+
// We haven't actually subscribed to this query but it had a value set in an optimistic update.
274+
expect(client.localQueryResult("queries:b", {})).toEqual("b local");
275+
const update1 = await q.updatePromises[0];
276+
expect(q.updates).toHaveLength(1);
277+
expect(update1).toEqual({
278+
'{"udfPath":"queries:a","args":{}}': "a local",
279+
'{"udfPath":"queries:b","args":{}}': "b local",
280+
});
281+
282+
// Now a transition arrives containing only an update to query a.
283+
// This previously crashed this execution context.
284+
send({
285+
type: "Transition",
286+
startVersion: {
287+
querySet: 0,
288+
identity: 0,
289+
ts: Long.fromNumber(0),
290+
},
291+
endVersion: {
292+
querySet: 1,
293+
identity: 0,
294+
ts: Long.fromNumber(100),
295+
},
296+
modifications: [
297+
{
298+
type: "QueryUpdated",
299+
queryId: 0,
300+
value: "a server",
301+
logLines: [],
302+
journal: null,
303+
},
304+
],
305+
});
306+
307+
const update2 = await q.updatePromises[1];
308+
expect(update2).toEqual({
309+
'{"udfPath":"queries:a","args":{}}': "a server with a local applied",
310+
'{"udfPath":"queries:b","args":{}}': "b local",
311+
});
312+
expect(q.allResults).toEqual({
313+
'{"udfPath":"queries:a","args":{}}': "a server with a local applied",
314+
'{"udfPath":"queries:b","args":{}}': "b local",
315+
});
316+
expect(q.updates).toHaveLength(2);
317+
318+
const mutationRequest = await receive();
319+
expect(mutationRequest.type).toEqual("Mutation");
320+
expect(mutationRequest).toEqual({
321+
type: "Mutation",
322+
requestId: 0,
323+
udfPath: "mutations:z",
324+
args: [{}],
325+
});
326+
327+
// Now the server sends:
328+
329+
// 1. MutationResponse saying the mutation has run
330+
send({
331+
type: "MutationResponse",
332+
requestId: 0,
333+
success: true,
334+
result: null,
335+
ts: Long.fromNumber(200), // "ZDhuVB3CRxg=", in example
336+
logLines: [],
337+
});
338+
339+
// 2. Transition bringing us up to date with the mutation
340+
send({
341+
type: "Transition",
342+
startVersion: { querySet: 1, identity: 0, ts: Long.fromNumber(100) },
343+
endVersion: { querySet: 1, identity: 0, ts: Long.fromNumber(200) },
344+
modifications: [
345+
{
346+
type: "QueryUpdated",
347+
queryId: 0,
348+
value: "a server",
349+
logLines: [],
350+
journal: null,
351+
},
352+
],
353+
});
354+
355+
expect(await q.updatePromises[2]).toEqual({
356+
'{"udfPath":"queries:a","args":{}}': "a server",
357+
// Now there's no more optimistic value for b!
358+
'{"udfPath":"queries:b","args":{}}': undefined,
359+
});
360+
361+
// After all that the mutation should resolve.
362+
await mutP;
363+
364+
await client.close();
365+
}, true);
366+
});

src/browser/sync/client_node_test_helpers.ts

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ import WebSocket, { WebSocketServer } from "ws";
88
export const nodeWebSocket = WebSocket as unknown as typeof window.WebSocket;
99

1010
import { ClientMessage, ServerMessage } from "./protocol.js";
11+
import { QueryToken } from "./udf_path_utils.js";
12+
import { BaseConvexClient } from "./client.js";
1113

1214
export type InMemoryWebSocketTest = (args: {
1315
address: string;
@@ -104,3 +106,50 @@ function encodeLong(n: Long) {
104106
const integerBytes = Uint8Array.from(n.toBytesLE());
105107
return Base64.fromByteArray(integerBytes);
106108
}
109+
110+
/**
111+
* const q = new UpdateQueue();
112+
* const client = new Client("http://...", q.onTransition);
113+
*
114+
* await q.updatePromises[3];
115+
*
116+
*/
117+
export class UpdateQueue {
118+
updateResolves: ((v: Record<QueryToken, any>) => void)[];
119+
updatePromises: Promise<Record<QueryToken, any>>[];
120+
updates: Record<QueryToken, any>[];
121+
allResults: Record<QueryToken, any>;
122+
nextIndex: number;
123+
124+
constructor(maxLength = 10) {
125+
this.updateResolves = [];
126+
this.updatePromises = [];
127+
this.allResults = {};
128+
this.updates = [];
129+
this.nextIndex = 0;
130+
131+
let nextResolve: (v: Record<QueryToken, any>) => void;
132+
let nextPromise: Promise<Record<QueryToken, any>>;
133+
134+
for (let i = 0; i < maxLength; i++) {
135+
nextPromise = new Promise((r) => {
136+
nextResolve = r;
137+
});
138+
this.updateResolves.push(nextResolve!);
139+
this.updatePromises.push(nextPromise);
140+
}
141+
}
142+
143+
onTransition =
144+
(client: BaseConvexClient) => (updatedQueryTokens: QueryToken[]) => {
145+
const update: Record<QueryToken, any> = {};
146+
for (const queryToken of updatedQueryTokens) {
147+
const value = client.localQueryResultByToken(queryToken);
148+
update[queryToken] = value;
149+
this.allResults[queryToken] = value;
150+
}
151+
this.updateResolves[this.nextIndex](update);
152+
this.updates.push(update);
153+
this.nextIndex++;
154+
};
155+
}

src/browser/sync/optimistic_updates_impl.ts

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,8 @@ type OptimisticUpdateAndId = {
131131
type Query = {
132132
// undefined means the query was set to be loading (undefined) in an optimistic update.
133133
// Note that we can also have queries not present in the QueryResultMap
134-
// at all because they are still loading from the server.
134+
// at all because they are still loading from the server and have no optimistic update
135+
// setting an optimistic value in advance.
135136
result: FunctionResult | undefined;
136137
udfPath: string;
137138
args: Record<string, Value>;
@@ -152,6 +153,9 @@ export class OptimisticQueryResults {
152153
this.optimisticUpdates = [];
153154
}
154155

156+
/**
157+
* Apply all optimistic updates on top of server query results
158+
*/
155159
ingestQueryResultsFromServer(
156160
serverQueryResults: QueryResultsMap,
157161
optimisticUpdatesToDrop: Set<RequestId>,
@@ -197,6 +201,13 @@ export class OptimisticQueryResults {
197201
return localStore.modifiedQueries;
198202
}
199203

204+
/**
205+
* @internal
206+
*/
207+
rawQueryResult(queryToken: QueryToken): Query | undefined {
208+
return this.queryResults.get(queryToken);
209+
}
210+
200211
queryResult(queryToken: QueryToken): Value | undefined {
201212
const query = this.queryResults.get(queryToken);
202213
if (query === undefined) {

src/browser/sync/remote_query_set.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@ import { FunctionResult } from "./function_result.js";
77
/**
88
* A represention of the query results we've received on the current WebSocket
99
* connection.
10+
*
11+
* Queries you won't find here include:
12+
* - queries which have been requested, but no query transition has been received yet for
13+
* - queries which are populated only though active optimistic updates, but are not subscribed to
1014
*/
1115
export class RemoteQuerySet {
1216
private version: StateVersion;

0 commit comments

Comments
 (0)