AI chat agents with automatic message persistence, resumable streaming, and tool support. Built on Cloudflare Durable Objects and the AI SDK.
npm install @cloudflare/ai-chat agents ai workers-ai-providerimport { AIChatAgent } from "@cloudflare/ai-chat";
import { createWorkersAI } from "workers-ai-provider";
import { streamText, convertToModelMessages } from "ai";
export class ChatAgent extends AIChatAgent {
async onChatMessage() {
const workersai = createWorkersAI({ binding: this.env.AI });
const result = streamText({
model: workersai("@cf/moonshotai/kimi-k2.6"),
messages: await convertToModelMessages(this.messages)
});
return result.toUIMessageStreamResponse();
}
}That gives you: automatic message persistence in SQLite, resumable streaming on disconnect/reconnect, and real-time WebSocket delivery to all connected clients.
import { useAgent } from "agents/react";
import { useAgentChat } from "@cloudflare/ai-chat/react";
function Chat() {
const agent = useAgent({ agent: "ChatAgent" });
const { messages, sendMessage, clearHistory, status } = useAgentChat({
agent
});
return (
<div>
{messages.map((msg) => (
<div key={msg.id}>
<strong>{msg.role}:</strong>
{msg.parts.map((part, i) =>
part.type === "text" ? <span key={i}>{part.text}</span> : null
)}
</div>
))}
<form
onSubmit={(e) => {
e.preventDefault();
const input = e.currentTarget.elements.namedItem(
"input"
) as HTMLInputElement;
sendMessage({
role: "user",
parts: [{ type: "text", text: input.value }]
});
input.value = "";
}}
>
<input name="input" placeholder="Type a message..." />
</form>
</div>
);
}Tools with an execute function run on the server automatically:
import { createWorkersAI } from "workers-ai-provider";
import { streamText, convertToModelMessages, tool } from "ai";
import { z } from "zod";
export class ChatAgent extends AIChatAgent {
async onChatMessage() {
const workersai = createWorkersAI({ binding: this.env.AI });
const result = streamText({
model: workersai("@cf/moonshotai/kimi-k2.6"),
messages: await convertToModelMessages(this.messages),
tools: {
getWeather: tool({
description: "Get weather for a city",
inputSchema: z.object({ city: z.string() }),
execute: async ({ city }) => {
const data = await fetchWeather(city);
return { temperature: data.temp, condition: data.condition };
}
})
},
maxSteps: 5
});
return result.toUIMessageStreamResponse();
}
}Tools without execute are handled on the client via onToolCall. Use this for tools that need browser APIs (geolocation, clipboard, camera):
// Server: define tool without execute
getLocation: tool({
description: "Get the user's location from their browser",
inputSchema: z.object({})
// No execute -- client handles it
});// Client: handle via onToolCall
const { messages, sendMessage } = useAgentChat({
agent,
onToolCall: async ({ toolCall, addToolOutput }) => {
if (toolCall.toolName === "getLocation") {
const pos = await new Promise((resolve, reject) =>
navigator.geolocation.getCurrentPosition(resolve, reject)
);
addToolOutput({
toolCallId: toolCall.toolCallId,
output: { lat: pos.coords.latitude, lng: pos.coords.longitude }
});
}
}
});Use needsApproval for tools that require user confirmation before executing:
// Server
processPayment: tool({
description: "Process a payment",
inputSchema: z.object({ amount: z.number(), recipient: z.string() }),
needsApproval: async ({ amount }) => amount > 100, // Only require approval for large amounts
execute: async ({ amount, recipient }) => charge(amount, recipient)
});// Client
const { messages, addToolApprovalResponse } = useAgentChat({ agent });
// When rendering tool parts with state === "approval-requested":
<button onClick={() => addToolApprovalResponse({ id: approvalId, approved: true })}>
Approve
</button>
<button onClick={() => addToolApprovalResponse({ id: approvalId, approved: false })}>
Reject
</button>Streams automatically resume on disconnect/reconnect. No configuration needed.
When a client disconnects mid-stream, chunks are buffered in SQLite. On reconnect, the client receives all buffered chunks and continues receiving the live stream.
Disable with resume: false:
const { messages } = useAgentChat({ agent, resume: false });When users submit a new message while another turn is still active, AIChatAgent
can queue, collapse, or drop the overlap server-side:
export class ChatAgent extends AIChatAgent {
messageConcurrency = "latest";
async onChatMessage() {
// ...
}
}Available strategies:
"queue"(default) — process every submit in order"latest"— keep only the newest overlapping submit and skip any older queued overlap turns"merge"— queue overlapping submits, then collapse their queued user messages into one combined follow-up user turn"drop"— ignore overlapping submits{ strategy: "debounce", debounceMs: 750 }— wait for a quiet window, then run only the latest submit
Choosing a strategy: Use "latest" for focused assistants where the user
can correct themselves mid-stream. Use "queue" or "merge" for messaging
apps where every message matters. Use "drop" to prevent double-sends. Use
"debounce" when users send bursts of short messages.
What the user sees: With "queue", every message gets its own response.
With "latest", all messages appear but only the last overlapping one gets a
response. With "merge", overlapping messages are collapsed into one. With
"drop", the overlapping message briefly appears then disappears (rollback).
This setting only affects overlapping sendMessage() submits. Regenerate,
tool continuations, approvals, and programmatic saveMessages() calls keep the
existing serialized behavior. When debounceMs is missing or invalid,
AIChatAgent falls back to the default 750 ms window.
Pass a function to saveMessages() to derive from the latest transcript when
the turn actually starts — useful for schedule callbacks and webhook handlers
where messages may have changed since the call was made:
await this.saveMessages((messages) => [
...messages,
{
id: crypto.randomUUID(),
role: "user",
parts: [{ type: "text", text: "Scheduled follow-up" }]
}
]);saveMessages() returns { requestId, status } — check status to detect
whether the turn was skipped (e.g. because the chat was cleared while queued).
Cap the number of messages kept in SQLite:
export class ChatAgent extends AIChatAgent {
maxPersistedMessages = 200; // Keep last 200 messages
async onChatMessage() {
// ...
}
}Oldest messages are deleted when the count exceeds the limit. This controls storage only -- it does not affect what is sent to the LLM.
Use the AI SDK's pruneMessages() to control what is sent to the model, independently of what is stored:
import { createWorkersAI } from "workers-ai-provider";
import { streamText, convertToModelMessages, pruneMessages } from "ai";
export class ChatAgent extends AIChatAgent {
maxPersistedMessages = 200;
async onChatMessage() {
const workersai = createWorkersAI({ binding: this.env.AI });
const result = streamText({
model: workersai("@cf/moonshotai/kimi-k2.6"),
messages: pruneMessages({
messages: await convertToModelMessages(this.messages),
reasoning: "before-last-message",
toolCalls: "before-last-2-messages"
})
});
return result.toUIMessageStreamResponse();
}
}Messages approaching SQLite's 2MB row limit are automatically compacted. Large tool outputs are replaced with an LLM-friendly summary that instructs the model to suggest re-running the tool. Compacted messages include metadata.compactedToolOutputs so clients can detect and display this gracefully.
Include custom data with every chat request using the body option:
const { messages, sendMessage } = useAgentChat({
agent,
body: {
timezone: Intl.DateTimeFormat().resolvedOptions().timeZone,
userId: "abc"
}
});
// Or use a function for dynamic values:
body: () => ({ token: getAuthToken(), timestamp: Date.now() });Access these fields on the server via options.body:
async onChatMessage(onFinish, options) {
const { timezone, userId } = options?.body ?? {};
}Extends Agent from the agents package.
| Property / Method | Type | Description |
|---|---|---|
messages |
ChatMessage[] |
Current conversation messages (loaded from SQLite) |
maxPersistedMessages |
number | undefined |
Max messages to keep in SQLite. Default: unlimited |
messageConcurrency |
MessageConcurrency |
Concurrency strategy for sendMessage() submits. Default: "queue" |
onChatMessage(onFinish?, options?) |
Override | Handle incoming chat messages. Return a Response. onFinish is optional. |
onChatResponse(result) |
Override | Called after a chat turn completes. result has message, requestId, status, continuation |
persistMessages(messages) |
Promise<void> |
Manually persist messages (usually automatic) |
saveMessages(messages) |
Promise<SaveMessagesResult> |
Persist messages and trigger onChatMessage. Accepts array or function. |
waitUntilStable() |
Promise<boolean> |
Protected helper to wait until the conversation is fully stable |
resetTurnState() |
void |
Protected helper to abort the active turn and invalidate queued continuations |
hasPendingInteraction() |
boolean |
Protected helper to detect pending tool input or approval in assistant messages |
React hook for chat interactions. Wraps the AI SDK's useChat with WebSocket transport.
Options:
| Option | Type | Description |
|---|---|---|
agent |
ReturnType<typeof useAgent> |
Agent connection (required) |
onToolCall |
({ toolCall, addToolOutput }) => void |
Handle client-side tool execution |
autoContinueAfterToolResult |
boolean |
Auto-continue after client tool results. Default: true |
resume |
boolean |
Enable stream resumption. Default: true |
body |
object | () => object |
Custom data sent with every request (see below) |
prepareSendMessagesRequest |
(options) => { body?, headers? } |
Advanced per-request customization |
getInitialMessages |
(options) => Promise<ChatMessage[]> |
Custom initial message loader |
Returns:
| Property | Type | Description |
|---|---|---|
messages |
ChatMessage[] |
Chat messages |
sendMessage |
(message) => void |
Send a message |
clearHistory |
() => void |
Clear conversation |
addToolOutput |
({ toolCallId, output }) => void |
Provide tool output |
addToolApprovalResponse |
({ id, approved }) => void |
Approve/reject a tool |
setMessages |
(messages | updater) => void |
Set messages (syncs to server) |
status |
string |
"idle" | "submitted" | "streaming" | "error" |
| Import path | What it provides |
|---|---|
@cloudflare/ai-chat |
AIChatAgent, ChatMessage, createToolsFromClientSchemas |
@cloudflare/ai-chat/react |
useAgentChat |
@cloudflare/ai-chat/types |
MessageType, OutgoingMessage, IncomingMessage |
- Resumable streaming chat -- automatic stream resumption
- Human-in-the-loop guide -- tool approval with
needsApproval+onToolCall - Playground -- kitchen-sink demo of all SDK features
MIT