Skip to content

Commit c34ccd7

Browse files
feat (ui/react): support resuming an ongoing stream (vercel#6053)
## Background This pull request adds the ability for clients to resume an ongoing chat generation stream after a network disconnect. ## Summary This pull request adds support for the `useChat` hook to resume an ongoing chat generation stream by exposing `experimental_resume()` that can be called by any client, typically during the initial mount of the hook. The `experimental_resume` function makes a `GET` request to the api endpoint you've initialized the hook with (or `/api/chat` by default) and streams the contents of the stream if it is active or fails silently if it has ended. In order for `experimental_resume` to work as intended, it requires the usage of the [`resumable-stream`](https://www.npmjs.com/package/resumable-stream) package for stream creation and a redis instance for the package to manage the pub/sub mechanism. ## Verification An example has been added at `examples/next-openai/app/use-chat-resume` to test the feature. Follow the following steps to test it end-to-end: 1. Run the development server 2. Navigate to `http://localhost:3000/use-chat-resume` 3. Send a message that will have a longer generation duration, example "Write an essay about Michael Jordan" 4. Once the generation starts, click the chat id above to open the conversation in a new tab 5. Verify the stream gets resumed ## Tasks - [x] Tests have been added / updated (for bug fixes / features) - [x] Documentation has been added / updated (for bug fixes / features) - [x] A _patch_ changeset for relevant packages has been added - [x] Formatting issues have been fixed (run `pnpm prettier-fix` in the project root)
1 parent b8239db commit c34ccd7

File tree

15 files changed

+746
-40
lines changed

15 files changed

+746
-40
lines changed

.changeset/nine-pillows-hug.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@ai-sdk/react': patch
3+
---
4+
5+
feat (ui/react): support resuming an ongoing stream

content/docs/04-ai-sdk-ui/03-chatbot-message-persistence.mdx

Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -325,3 +325,165 @@ When the client reloads the page after a disconnect, the chat will be restored f
325325
the case where the client reloads the page after a disconnection, but the
326326
streaming is not yet complete.
327327
</Note>
328+
329+
## Resuming ongoing streams
330+
331+
<Note>This feature is experimental and may change in future versions.</Note>
332+
333+
The `useChat` hook has experimental support for resuming an ongoing chat generation stream by any client, either after a network disconnect or by reloading the chat page. This can be useful for building applications that involve long-running conversations or for ensuring that messages are not lost in case of network failures.
334+
335+
The following are the pre-requisities for your chat application to support resumable streams:
336+
337+
- Installing the [`resumable-stream`](https://www.npmjs.com/package/resumable-stream) package that helps create and manage the publisher/subscriber mechanism of the streams.
338+
- Creating a [Redis](https://vercel.com/marketplace/redis) instance to store the stream state.
339+
- Creating a table that tracks the stream IDs associated with a chat.
340+
341+
To resume a chat stream, you will use the `experimental_resume` function returned by the `useChat` hook. You will call this function during the initial mount of the hook inside the main chat component.
342+
343+
```tsx filename="app/components/chat.tsx"
344+
'use client'
345+
346+
import { useChat } from "@ai-sdk/react";
347+
import { Input } from "@/components/input";
348+
import { Messages } from "@/components/messages";
349+
350+
export function Chat() {
351+
const { experimental_resume } = useChat({id});
352+
353+
useEffect(() => {
354+
experimental_resume();
355+
356+
// we use an empty dependency array to
357+
// ensure this effect runs only once
358+
}, [])
359+
360+
return (
361+
<div>
362+
<Messages>
363+
<Input/>
364+
</div>
365+
)
366+
}
367+
```
368+
369+
The `experimental_resume` function makes a `GET` request to your configured chat endpoint (or `/api/chat` by default) whenever your client calls it. If there’s an active stream, it will pick up where it left off, otherwise it simply finishes without error.
370+
371+
The `GET` request automatically appends the `chatId` query parameter to the URL to help identify the chat the request belongs to. Using the `chatId`, you can look up the most recent stream ID from the database and resume the stream.
372+
373+
```bash
374+
GET /api/chat?chatId=<your-chat-id>
375+
```
376+
377+
Earlier, you must've implemented the `POST` handler for the `/api/chat` route to create new chat generations. When using `experimental_resume`, you must also implement the `GET` handler for `/api/chat` route to resume streams.
378+
379+
### 1. Implement the GET handler
380+
381+
Add a `GET` method to `/api/chat` that:
382+
383+
1. Reads `chatId` from the query string
384+
2. Validates it’s present
385+
3. Loads any stored stream IDs for that chat
386+
4. Returns the latest one to `streamContext.resumableStream()`
387+
5. Falls back to an empty stream if it’s already closed
388+
389+
```ts filename="app/api/chat/route.ts"
390+
import { loadStreams } from '@/util/chat-store';
391+
import { createDataStream } from 'ai';
392+
import { after } from 'next/server';
393+
import { createResumableStreamContext } from 'resumable-stream';
394+
395+
const streamContext = createResumableStreamContext({
396+
waitUntil: after,
397+
});
398+
399+
export async function GET() {
400+
const { searchParams } = new URL(request.url);
401+
const chatId = searchParams.get('chatId');
402+
403+
if (!chatId) {
404+
return new Response('id is required', { status: 400 });
405+
}
406+
407+
const streamIds = await loadStreams(chatId);
408+
409+
if (!streamIds.length) {
410+
return new Response('No streams found', { status: 404 });
411+
}
412+
413+
const recentStreamId = streamIds.at(-1);
414+
415+
if (!recentStreamId) {
416+
return new Response('No recent stream found', { status: 404 });
417+
}
418+
419+
const emptyDataStream = createDataStream({
420+
execute: () => {},
421+
});
422+
423+
return new Response(
424+
await streamContext.resumableStream(recentStreamId, () => emptyDataStream),
425+
);
426+
}
427+
```
428+
429+
After you've implemented the `GET` handler, you can update the `POST` handler to handle the creation of resumable streams.
430+
431+
### 2. Update the POST handler
432+
433+
When you create a brand-new chat completion, you must:
434+
435+
1. Generate a fresh `streamId`
436+
2. Persist it alongside your `chatId`
437+
3. Kick off a `createDataStream` that pipes tokens as they arrive
438+
4. Hand that new stream to `streamContext.resumableStream()`
439+
440+
```ts filename="app/api/chat/route.ts"
441+
import {
442+
appendResponseMessages,
443+
createDataStream,
444+
generateId,
445+
streamText,
446+
} from 'ai';
447+
import { appendStreamId, saveChat } from '@/util/chat-store';
448+
import { createResumableStreamContext } from 'resumable-stream';
449+
450+
const streamContext = createResumableStreamContext({
451+
waitUntil: after,
452+
});
453+
454+
async function POST(request: Request) {
455+
const { id, messages } = await req.json();
456+
const streamId = generateId();
457+
458+
// Record this new stream so we can resume later
459+
await appendStreamId({ chatId: id, streamId });
460+
461+
// Build the data stream that will emit tokens
462+
const stream = createDataStream({
463+
execute: dataStream => {
464+
const result = streamText({
465+
model: openai('gpt-4o'),
466+
messages,
467+
onFinish: async ({ response }) => {
468+
await saveChat({
469+
id,
470+
messages: appendResponseMessages({
471+
messages,
472+
responseMessages: response.messages,
473+
}),
474+
});
475+
},
476+
});
477+
478+
// Return a resumable stream to the client
479+
result.mergeIntoDataStream(dataStream);
480+
},
481+
});
482+
483+
return new Response(
484+
await streamContext.resumableStream(streamId, () => stream),
485+
);
486+
}
487+
```
488+
489+
With both handlers, your clients can now gracefully resume ongoing streams.

content/docs/07-reference/02-ai-sdk-ui/01-use-chat.mdx

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -593,6 +593,11 @@ Allows you to easily create a conversational user interface for your chatbot app
593593
type: '() => void',
594594
description: 'Function to abort the current API request.',
595595
},
596+
{
597+
name: 'experimental_resume',
598+
type: '() => void',
599+
description: 'Function to resume an ongoing chat generation stream.',
600+
}
596601
{
597602
name: 'setMessages',
598603
type: '(messages: Message[] | ((messages: Message[]) => Message[]) => void',

examples/next-openai/.env.local.example

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,3 +13,5 @@ BLOB_READ_WRITE_TOKEN=xxxxxxx
1313
# Required for reasoning example
1414
DEEPSEEK_API_KEY=xxxxxxx
1515

16+
# Required for resumable streams. You can create a Redis store here: https://vercel.com/marketplace/redis
17+
REDIS_URL=xxxxxx

examples/next-openai/.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,5 +34,6 @@ yarn-error.log*
3434
*.tsbuildinfo
3535
next-env.d.ts
3636

37-
# chat persistence
37+
# persistence
3838
.chats
39+
.streams
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
import {
2+
appendMessageToChat,
3+
appendStreamId,
4+
loadStreams,
5+
saveChat,
6+
} from '@/util/chat-store';
7+
import { openai } from '@ai-sdk/openai';
8+
import {
9+
appendResponseMessages,
10+
createDataStream,
11+
generateId,
12+
UIMessage,
13+
streamText,
14+
} from 'ai';
15+
import { after } from 'next/server';
16+
import { createResumableStreamContext } from 'resumable-stream';
17+
18+
// Allow streaming responses up to 30 seconds
19+
export const maxDuration = 30;
20+
21+
export async function POST(req: Request) {
22+
const streamContext = createResumableStreamContext({
23+
waitUntil: after,
24+
});
25+
26+
const { id, messages }: { id: string; messages: UIMessage[] } =
27+
await req.json();
28+
29+
const streamId = generateId();
30+
31+
const recentUserMessage = messages
32+
.filter(message => message.role === 'user')
33+
.at(-1);
34+
35+
if (!recentUserMessage) {
36+
throw new Error('No recent user message found');
37+
}
38+
39+
await appendMessageToChat({ chatId: id, message: recentUserMessage });
40+
41+
await appendStreamId({ chatId: id, streamId });
42+
43+
const stream = createDataStream({
44+
execute: dataStream => {
45+
const result = streamText({
46+
model: openai('gpt-4o'),
47+
messages,
48+
onFinish: async ({ response }) => {
49+
await saveChat({
50+
id,
51+
messages: appendResponseMessages({
52+
messages,
53+
responseMessages: response.messages,
54+
}),
55+
});
56+
},
57+
});
58+
59+
result.mergeIntoDataStream(dataStream);
60+
},
61+
});
62+
63+
return new Response(
64+
await streamContext.resumableStream(streamId, () => stream),
65+
);
66+
}
67+
68+
export async function GET(request: Request) {
69+
const streamContext = createResumableStreamContext({
70+
waitUntil: after,
71+
});
72+
73+
const { searchParams } = new URL(request.url);
74+
const chatId = searchParams.get('chatId');
75+
76+
if (!chatId) {
77+
return new Response('id is required', { status: 400 });
78+
}
79+
80+
const streamIds = await loadStreams(chatId);
81+
82+
if (!streamIds.length) {
83+
return new Response('No streams found', { status: 404 });
84+
}
85+
86+
const recentStreamId = streamIds.at(-1);
87+
88+
if (!recentStreamId) {
89+
return new Response('No recent stream found', { status: 404 });
90+
}
91+
92+
const emptyDataStream = createDataStream({
93+
execute: () => {},
94+
});
95+
96+
return new Response(
97+
await streamContext.resumableStream(recentStreamId, () => emptyDataStream),
98+
);
99+
}
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
import { loadChat } from '@/util/chat-store';
2+
import { Chat } from '../chat';
3+
4+
export default async function Page({
5+
params,
6+
}: {
7+
params: Promise<{ id: string }>;
8+
}) {
9+
const { id } = await params;
10+
11+
const messages = await loadChat(id);
12+
13+
return <Chat chatId={id} autoResume={true} initialMessages={messages} />;
14+
}

0 commit comments

Comments
 (0)