Skip to content

Commit 60e2c56

Browse files
authored
feat (ai): restructure chat transports (vercel#6746)
## Background Chat resumption on reload needs a separate endpoint to connect to. ## Summary * restructure chat transports * move default route for stream reconnection to `/chat/[id]/stream` * rename `prepareRequest` to `prepareSubmitMessagesRequest`` ## Verification * test `next` example
1 parent 395c85e commit 60e2c56

File tree

21 files changed

+415
-446
lines changed

21 files changed

+415
-446
lines changed

.changeset/tricky-ravens-kick.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'ai': major
3+
---
4+
5+
feat (ai): restructure chat transports
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
import { loadStreams } from '@/util/chat-store';
2+
import { createUIMessageStream, JsonToSseTransformStream } from 'ai';
3+
import { after } from 'next/server';
4+
import { createResumableStreamContext } from 'resumable-stream';
5+
6+
// Allow streaming responses up to 30 seconds
7+
export const maxDuration = 30;
8+
9+
export async function GET(
10+
request: Request,
11+
{ params }: { params: Promise<{ id: string }> },
12+
) {
13+
const { id } = await params;
14+
15+
if (!id) {
16+
return new Response('id is required', { status: 400 });
17+
}
18+
19+
const streamIds = await loadStreams(id);
20+
21+
if (!streamIds.length) {
22+
return new Response('No streams found', { status: 204 });
23+
}
24+
25+
const recentStreamId = streamIds.at(-1);
26+
27+
if (!recentStreamId) {
28+
return new Response('No recent stream found', { status: 204 });
29+
}
30+
31+
const emptyDataStream = createUIMessageStream({
32+
execute: () => {},
33+
});
34+
35+
const streamContext = createResumableStreamContext({
36+
waitUntil: after,
37+
});
38+
39+
return new Response(
40+
await streamContext.resumableStream(recentStreamId, () =>
41+
emptyDataStream.pipeThrough(new JsonToSseTransformStream()),
42+
),
43+
);
44+
}

examples/next-openai/app/api/use-chat-resume/route.ts

Lines changed: 0 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import {
22
appendMessageToChat,
33
appendStreamId,
4-
loadStreams,
54
saveChat,
65
} from '@/util/chat-store';
76
import { openai } from '@ai-sdk/openai';
@@ -64,38 +63,3 @@ export async function POST(req: Request) {
6463
),
6564
);
6665
}
67-
68-
export async function GET(request: Request) {
69-
const streamContext = createResumableStreamContext({
70-
waitUntil: after,
71-
});
72-
73-
const { searchParams } = new URL(request.url);
74-
const chatId = searchParams.get('chatId');
75-
76-
if (!chatId) {
77-
return new Response('id is required', { status: 400 });
78-
}
79-
80-
const streamIds = await loadStreams(chatId);
81-
82-
if (!streamIds.length) {
83-
return new Response('No streams found', { status: 404 });
84-
}
85-
86-
const recentStreamId = streamIds.at(-1);
87-
88-
if (!recentStreamId) {
89-
return new Response('No recent stream found', { status: 404 });
90-
}
91-
92-
const emptyDataStream = createUIMessageStream({
93-
execute: () => {},
94-
});
95-
96-
return new Response(
97-
await streamContext.resumableStream(recentStreamId, () =>
98-
emptyDataStream.pipeThrough(new JsonToSseTransformStream()),
99-
),
100-
);
101-
}

examples/next-openai/app/use-chat-persistence-single-message/[id]/chat.tsx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ export default function Chat({
1414
transport: new DefaultChatTransport({
1515
api: '/api/use-chat-persistence-single-message',
1616
// only send the last message to the server:
17-
prepareRequest({ messages, id }) {
17+
prepareSubmitMessagesRequest({ messages, id }) {
1818
return { body: { message: messages[messages.length - 1], id } };
1919
},
2020
}),
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
import { readChat } from '@util/chat-store';
2+
import { UI_MESSAGE_STREAM_HEADERS } from 'ai';
3+
import { after } from 'next/server';
4+
import { createResumableStreamContext } from 'resumable-stream';
5+
6+
export async function GET(
7+
request: Request,
8+
{ params }: { params: Promise<{ id: string }> },
9+
) {
10+
const { id } = await params;
11+
12+
const chat = await readChat(id);
13+
14+
if (chat.activeStreamId == null) {
15+
// no content response when there is no active stream
16+
return new Response(null, { status: 204 });
17+
}
18+
19+
const streamContext = createResumableStreamContext({
20+
waitUntil: after,
21+
});
22+
23+
return new Response(
24+
await streamContext.resumeExistingStream(chat.activeStreamId),
25+
{ headers: UI_MESSAGE_STREAM_HEADERS },
26+
);
27+
}

examples/next/app/api/chat/route.ts

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
import { MyUIMessage } from '@/util/chat-schema';
22
import { readChat, saveChat } from '@util/chat-store';
3-
import { convertToModelMessages, streamText } from 'ai';
3+
import { convertToModelMessages, generateId, streamText } from 'ai';
4+
import { after } from 'next/server';
5+
import { createResumableStreamContext } from 'resumable-stream';
46

57
export async function POST(req: Request) {
68
const { message, id }: { message: MyUIMessage; id: string } =
@@ -9,13 +11,14 @@ export async function POST(req: Request) {
911
const chat = await readChat(id);
1012
const messages = [...chat.messages, message];
1113

14+
// save the user message
15+
saveChat({ id, messages, activeStreamId: null });
16+
1217
const result = streamText({
1318
model: 'openai/gpt-4o-mini',
1419
messages: convertToModelMessages(messages),
1520
});
1621

17-
result.consumeStream(); // TODO always consume the stream even when the client disconnects
18-
1922
return result.toUIMessageStreamResponse({
2023
originalMessages: messages,
2124
messageMetadata: ({ part }) => {
@@ -24,8 +27,17 @@ export async function POST(req: Request) {
2427
}
2528
},
2629
onFinish: ({ messages }) => {
27-
// TODO fix type safety
28-
saveChat({ id, messages: messages as MyUIMessage[] });
30+
saveChat({ id, messages, activeStreamId: null });
31+
},
32+
async consumeSseStream({ stream }) {
33+
const streamId = generateId();
34+
35+
// send the sse stream into a resumable stream sink as well:
36+
const streamContext = createResumableStreamContext({ waitUntil: after });
37+
await streamContext.createNewResumableStream(streamId, () => stream);
38+
39+
// update the chat with the streamId
40+
saveChat({ id, activeStreamId: streamId });
2941
},
3042
});
3143
}

examples/next/app/chat/[chatId]/chat.tsx

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,18 +11,21 @@ import Message from './message';
1111
export default function ChatComponent({
1212
chatData,
1313
isNewChat = false,
14+
resume = false,
1415
}: {
1516
chatData: { id: string; messages: MyUIMessage[] };
1617
isNewChat?: boolean;
18+
resume?: boolean;
1719
}) {
1820
const inputRef = useRef<HTMLInputElement>(null);
1921

2022
const { status, sendMessage, messages } = useChat({
2123
id: chatData.id,
2224
messages: chatData.messages,
25+
resume,
2326
transport: new DefaultChatTransport({
2427
// only send the last message to the server to limit the request size:
25-
prepareRequest: ({ id, messages }) => ({
28+
prepareSubmitMessagesRequest: ({ id, messages }) => ({
2629
body: { id, message: messages[messages.length - 1] },
2730
}),
2831
}),

examples/next/app/chat/[chatId]/page.tsx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ export default async function Page(props: {
2323
</li>
2424
))}
2525
</ul>
26-
<Chat chatData={chatData} />;
26+
<Chat chatData={chatData} resume={chatData.activeStreamId !== null} />;
2727
</div>
2828
);
2929
}

examples/next/package.json

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,7 @@
1616
"react": "^18",
1717
"react-dom": "^18",
1818
"react-markdown": "9.0.1",
19-
"redis": "^4.7.0",
20-
"resumable-stream": "^2.0.0",
19+
"resumable-stream": "^2.2.0",
2120
"zod": "3.25.49"
2221
},
2322
"devDependencies": {

examples/next/util/chat-schema.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,4 +13,5 @@ export type ChatData = {
1313
id: string;
1414
messages: MyUIMessage[];
1515
createdAt: number;
16+
activeStreamId: string | null;
1617
};

examples/next/util/chat-store.ts

Lines changed: 13 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,23 @@ export async function createChat(): Promise<string> {
1616

1717
export async function saveChat({
1818
id,
19+
activeStreamId,
1920
messages,
2021
}: {
2122
id: string;
22-
messages: MyUIMessage[];
23+
activeStreamId?: string | null;
24+
messages?: MyUIMessage[];
2325
}): Promise<void> {
2426
const chat = await readChat(id);
25-
chat.messages = messages;
27+
28+
if (messages !== undefined) {
29+
chat.messages = messages;
30+
}
31+
32+
if (activeStreamId !== undefined) {
33+
chat.activeStreamId = activeStreamId;
34+
}
35+
2636
writeChat(chat);
2737
}
2838

@@ -42,6 +52,7 @@ async function writeChat(chat: ChatData) {
4252
await writeFile(await getChatFile(chat.id), JSON.stringify(chat, null, 2));
4353
}
4454

55+
// TODO return null if the chat does not exist
4556
export async function readChat(id: string): Promise<ChatData> {
4657
return JSON.parse(await readFile(await getChatFile(id), 'utf8'));
4758
}
@@ -72,28 +83,3 @@ async function getChatFile(id: string): Promise<string> {
7283

7384
return chatFile;
7485
}
75-
76-
export async function appendStreamId({
77-
id,
78-
streamId,
79-
}: {
80-
id: string;
81-
streamId: string;
82-
}) {
83-
const file = getStreamsFile(id);
84-
const streams = await loadStreams(id);
85-
streams.push(streamId);
86-
await writeFile(file, JSON.stringify(streams, null, 2));
87-
}
88-
89-
export async function loadStreams(id: string): Promise<string[]> {
90-
const file = getStreamsFile(id);
91-
if (!existsSync(file)) return [];
92-
return JSON.parse(await readFile(file, 'utf8'));
93-
}
94-
95-
function getStreamsFile(id: string): string {
96-
const chatDir = path.join(process.cwd(), '.streams');
97-
if (!existsSync(chatDir)) mkdirSync(chatDir, { recursive: true });
98-
return path.join(chatDir, `${id}.json`);
99-
}

packages/ai/src/ui/chat-transport.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,12 @@ export interface ChatTransport<UI_MESSAGE extends UIMessage> {
99
chatId: string;
1010
messages: UI_MESSAGE[];
1111
abortSignal: AbortSignal | undefined;
12-
requestType: 'generate' | 'resume'; // TODO have separate functions
1312
} & ChatRequestOptions,
1413
) => Promise<ReadableStream<UIMessageStreamPart>>;
14+
15+
reconnectToStream: (
16+
options: {
17+
chatId: string;
18+
} & ChatRequestOptions,
19+
) => Promise<ReadableStream<UIMessageStreamPart> | null>;
1520
}

packages/ai/src/ui/chat.ts

Lines changed: 26 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import {
2929
type UIDataTypes,
3030
type UIMessage,
3131
} from './ui-messages';
32+
import { UIMessageStreamPart } from '../ui-message-stream/ui-message-stream-parts';
3233

3334
export type CreateUIMessage<UI_MESSAGE extends UIMessage> = Omit<
3435
UI_MESSAGE,
@@ -384,15 +385,31 @@ export abstract class AbstractChat<UI_MESSAGE extends UIMessage> {
384385

385386
this.activeResponse = activeResponse;
386387

387-
const stream = await this.transport.submitMessages({
388-
chatId: this.id,
389-
messages: this.state.messages,
390-
abortSignal: activeResponse.abortController.signal,
391-
metadata,
392-
headers,
393-
body,
394-
requestType,
395-
});
388+
let stream: ReadableStream<UIMessageStreamPart>;
389+
390+
if (requestType === 'resume') {
391+
const reconnect = await this.transport.reconnectToStream({
392+
chatId: this.id,
393+
metadata,
394+
headers,
395+
body,
396+
});
397+
398+
if (reconnect == null) {
399+
return; // no active stream found, so we do not resume
400+
}
401+
402+
stream = reconnect;
403+
} else {
404+
stream = await this.transport.submitMessages({
405+
chatId: this.id,
406+
messages: this.state.messages,
407+
abortSignal: activeResponse.abortController.signal,
408+
metadata,
409+
headers,
410+
body,
411+
});
412+
}
396413

397414
const runUpdateMessageJob = (
398415
job: (options: {

0 commit comments

Comments
 (0)