Stream Persistence
Durably store AI streaming responses for replay, reconnection, and multi-client watching
Stream persistence saves AI streaming chunks to a durable store as they arrive. If a client disconnects mid-stream, another client can pick up where it left off or replay from the beginning. Multiple clients can watch the same stream simultaneously.
How It Works
Client A starts stream:
AI Model → chunks → StreamManager.persist() → StreamStore (SQLite)
↘ Client A (live)
Client A disconnects. Client B connects:
StreamManager.watch(streamId) → polls StreamStore → Client B (replay + live tail)
Stream status lifecycle:
queued → running → completed
→ failed
→ cancelledThree layers handle this:
- StreamStore — abstract storage interface for streams and chunks
- StreamManager — high-level lifecycle: register, persist, watch, cancel, cleanup
- persistedWriter — low-level writer wrapper that intercepts chunks and buffers them to the store
StreamStore
The abstract base class defining storage operations. SqliteStreamStore is the built-in implementation.
import { SqliteStreamStore } from '@deepagents/context';
const store = new SqliteStreamStore('./streams.db');
// Close when your process shuts down
store.close();You can also pass an existing DatabaseSync instance:
import { DatabaseSync } from 'node:sqlite';
const db = new DatabaseSync('./app.db');
const store = new SqliteStreamStore(db);If you pass your own DatabaseSync, call store.close() only if this stream store owns the connection lifecycle in your app.
StreamData
| Field | Type | Description |
|---|---|---|
id | string | Unique stream identifier |
status | StreamStatus | Current status: queued, running, completed, failed, cancelled |
createdAt | number | Timestamp when the stream was created |
startedAt | number | null | Timestamp when the stream started running |
finishedAt | number | null | Timestamp when the stream reached a terminal status |
cancelRequestedAt | number | null | Timestamp when cancellation was requested |
error | string | null | Error message if the stream failed |
StreamChunkData
| Field | Type | Description |
|---|---|---|
streamId | string | Parent stream ID |
seq | number | Monotonically increasing sequence number |
data | unknown | The stream part (serialized as JSON in SQLite) |
createdAt | number | Timestamp when the chunk was persisted |
Methods
| Method | Description |
|---|---|
createStream(stream) | Insert a new stream record |
upsertStream(stream) | Insert or return existing stream. Returns { stream, created } |
getStream(streamId) | Fetch stream metadata, or undefined if not found |
getStreamStatus(streamId) | Fetch only the stream status, or undefined if not found |
updateStreamStatus(streamId, status, opts?) | Transition stream status. Automatically sets startedAt, finishedAt, cancelRequestedAt, and error based on the target status |
appendChunks(chunks) | Batch-insert chunks (transactional in SQLite) |
getChunks(streamId, fromSeq?, limit?) | Fetch chunks ordered by seq, optionally starting from fromSeq |
deleteStream(streamId) | Delete stream and all its chunks (cascading) |
reopenStream(streamId) | Atomically delete a terminal stream and re-create it as queued. Throws if the stream is not found or not in a terminal status |
SqliteStreamStore.close()
SqliteStreamStore exposes an explicit close() method to release the underlying SQLite connection and clear prepared statements. It is idempotent.
const store = new SqliteStreamStore('./streams.db');
const manager = new StreamManager({ store });
// ... stream operations ...
// app shutdown
store.close();StreamManager
Orchestrates the full stream lifecycle.
import { StreamManager, SqliteStreamStore } from '@deepagents/context';
const manager = new StreamManager({
store: new SqliteStreamStore('./streams.db'),
watchPolling: {
minMs: 25,
maxMs: 500,
multiplier: 2,
jitterRatio: 0.15,
statusCheckEvery: 3,
chunkPageSize: 128,
},
cancelPolling: {
minMs: 50,
maxMs: 500,
multiplier: 2,
jitterRatio: 0.15,
},
});register(streamId)
Creates or retrieves a stream in queued status. Call this before starting the AI generation.
const { stream, created } = await manager.register('stream-abc');
if (!created) {
console.log(`Stream already exists with status: ${stream.status}`);
}persist(stream, streamId, options?)
Consumes a ReadableStream, persists every chunk to the store, and transitions status from running to completed (or failed on error). Polls for cancellation in the background.
import { createUIMessageStream } from 'ai';
const uiStream = createUIMessageStream({
execute: async ({ writer }) => {
writer.write({ type: 'text-delta', textDelta: 'Hello' });
writer.write({ type: 'text-delta', textDelta: ' world' });
},
});
await manager.persist(uiStream, 'stream-abc', {
strategy: 'buffered',
flushSize: 20,
cancelPolling: {
minMs: 50,
maxMs: 500,
multiplier: 2,
jitterRatio: 0.15,
},
});| Option | Type | Default | Description |
|---|---|---|---|
strategy | 'buffered' | 'immediate' | 'buffered' | buffered batches chunks for throughput; immediate persists each chunk before forwarding |
flushSize | number | 20 | For buffered strategy: flush after this many chunks |
cancelPolling | { minMs, maxMs, multiplier, jitterRatio } | {50, 500, 2, 0.15} | Adaptive cancellation polling policy |
If the stream is already in a terminal status (completed, failed, cancelled), persist returns immediately without re-running.
watch(streamId, options?)
Returns a ReadableStream that replays all persisted chunks, then live-tails new ones until the stream reaches a terminal status.
const replayStream = manager.watch('stream-abc', {
minMs: 25,
maxMs: 500,
multiplier: 2,
jitterRatio: 0.15,
statusCheckEvery: 3,
chunkPageSize: 128,
});
for await (const part of replayStream) {
console.log(part);
}| Option | Type | Default | Description |
|---|---|---|---|
minMs | number | 25 | Minimum poll interval |
maxMs | number | 500 | Maximum poll interval |
multiplier | number | 2 | Backoff multiplier when idle |
jitterRatio | number | 0.15 | Random jitter ratio applied per wait |
statusCheckEvery | number | 3 | Check stream status every N chunk polls while data is flowing |
chunkPageSize | number | 128 | Maximum chunks fetched per poll iteration |
Throws if the stream ID does not exist.
If a stream is deleted while being watched, the watcher closes gracefully.
Polling telemetry
Use onPollingEvent to observe polling behavior for tuning and diagnostics:
const manager = new StreamManager({
store: new SqliteStreamStore('./streams.db'),
onPollingEvent: (event) => {
if (event.type === 'watch:empty') {
console.debug('idle delay', event.delayMs);
}
},
});Event types:
watch:pollwatch:emptywatch:chunkswatch:closedpersist:cancel-pollpersist:cancel-detected
cancel(streamId)
Marks the stream as cancelled. The persist() loop detects this on its next poll and stops consuming.
await manager.cancel('stream-abc');cleanup(streamId)
Deletes the stream record and all associated chunks.
await manager.cleanup('stream-abc');reopen(streamId)
Atomically clears a terminal stream (deletes old chunks and record) and re-registers it as queued. Use this to reuse a streamId for a new generation cycle — for example, when continuing a multi-turn conversation on the same stream.
const { stream, created } = await manager.reopen('stream-abc');
// stream.status === 'queued', created === true
// Old chunks are deleted — call persist() with a new ReadableStreamThrows if the stream does not exist or is not in a terminal status (completed, failed, cancelled). This prevents accidentally destroying an in-progress stream.
persistedWriter
A lower-level API that wraps a UIMessageStreamWriter to intercept and persist chunks as they flow through. Use this when you need direct control over the writer inside createUIMessageStream.
import { createUIMessageStream } from 'ai';
import { persistedWriter, SqliteStreamStore } from '@deepagents/context';
const store = new SqliteStreamStore('./streams.db');
const uiStream = createUIMessageStream({
execute: async ({ writer }) => {
const pw = await persistedWriter({
writer,
store,
streamId: 'stream-abc',
strategy: 'buffered',
flushSize: 20,
});
pw.writer.write({ type: 'text-delta', textDelta: 'Hello' });
pw.writer.write({ type: 'text-delta', textDelta: ' world' });
await pw.complete();
},
});Options
| Option | Type | Default | Description |
|---|---|---|---|
writer | UIMessageStreamWriter | required | The original writer to wrap |
store | StreamStore | required | Where to persist chunks |
streamId | string | required | Stream identifier |
strategy | 'buffered' | 'immediate' | 'buffered' | Persistence strategy |
flushSize | number | 20 | Chunks to buffer before flushing (buffered strategy only) |
PersistedWriter Methods
| Method | Description |
|---|---|
flush() | Force-flush the buffer to the store |
complete() | Flush remaining chunks and mark stream as completed |
fail(error?) | Flush remaining chunks and mark stream as failed with optional error message |
cleanup() | Delete the stream and all chunks from the store |
The returned writer property is a drop-in replacement for UIMessageStreamWriter. Both write() and merge() are intercepted: chunks are persisted before being forwarded to the original writer.
Reconnectable Stream Pattern
Register the stream, start persisting on the server, and serve watch() to any client that connects (or reconnects).
import { StreamManager, SqliteStreamStore } from '@deepagents/context';
import { streamText } from 'ai';
import { groq } from '@ai-sdk/groq';
const manager = new StreamManager({
store: new SqliteStreamStore('./streams.db'),
});
async function startGeneration(streamId: string, prompt: string) {
await manager.register(streamId);
const { textStream } = streamText({
model: groq('gpt-oss-20b'),
prompt,
});
manager.persist(textStream, streamId);
}
function connectClient(streamId: string): ReadableStream {
return manager.watch(streamId);
}A client calling connectClient at any point receives all chunks produced so far, then continues to receive new ones in real time until the stream finishes.
Multi-Client Watching
Multiple clients can watch the same stream. Each watch() call creates an independent ReadableStream that polls the store separately.
const client1 = manager.watch('stream-abc');
const client2 = manager.watch('stream-abc');
const client3 = manager.watch('stream-abc');Each client replays from the beginning and tails until the stream reaches a terminal status.
Stream Status Lifecycle
register()
│
v
┌────────┐
│ queued │ ◄──── reopen()
└───┬────┘ (deletes old chunks,
│ persist() re-creates as queued)
v │
┌─────────┐ │
│ running │ │
└──┬──┬──┬─┘ │
│ │ │ │
ok │ │ │ cancel() │
v │ v │
┌───────────┐ │ ┌───────────┐│
│ completed │─┤ │ cancelled │┤
└───────────┘ │ └───────────┘│
v │
┌────────┐ │
│ failed │───────────┘
└────────┘Terminal statuses (completed, failed, cancelled) cause watch() to close after draining remaining chunks. Call reopen() on any terminal stream to reset it for a new generation cycle.
Custom StreamStore
Implement the abstract StreamStore class to back stream persistence with any database.
import { StreamStore } from '@deepagents/context';
import type { StreamData, StreamChunkData, StreamStatus } from '@deepagents/context';
class PostgresStreamStore extends StreamStore {
async createStream(stream: StreamData): Promise<void> { /* ... */ }
async upsertStream(stream: StreamData): Promise<{ stream: StreamData; created: boolean }> { /* ... */ }
async getStream(streamId: string): Promise<StreamData | undefined> { /* ... */ }
async getStreamStatus(streamId: string): Promise<StreamStatus | undefined> { /* ... */ }
async updateStreamStatus(streamId: string, status: StreamStatus, options?: { error?: string }): Promise<void> { /* ... */ }
async appendChunks(chunks: StreamChunkData[]): Promise<void> { /* ... */ }
async getChunks(streamId: string, fromSeq?: number, limit?: number): Promise<StreamChunkData[]> { /* ... */ }
async deleteStream(streamId: string): Promise<void> { /* ... */ }
async reopenStream(streamId: string): Promise<StreamData> { /* ... */ }
}Multi-Turn Conversation Flow
In a chat application, the agent may finish a turn by requesting user input — for example, asking the user to approve a SQL query before executing it. The stream completes, but the conversation continues. reopen() lets you reuse the same streamId for each turn without leaking old chunks.
Conversation history is stored separately in the context store (the message DAG). Stream chunks are ephemeral transport — they exist for SSE delivery and client reconnection during an active stream, not for permanent storage.
Turn 1: User asks a question
register(streamId) → queued
persist(chatStream, streamId) → running → completed
Client watches → sees tool-call → shows approval UI
Turn 2: User approves (conversation continues)
reopen(streamId) → deletes old chunks, re-queued
persist(newChatStream, streamId) → running → completed
Client re-watches → sees Turn 2 response
Turn N: Same pattern...Server Example
import { StreamManager, SqliteStreamStore } from '@deepagents/context';
const manager = new StreamManager({
store: new SqliteStreamStore('./streams.db'),
});
async function handleChatTurn(
streamId: string,
chatStream: ReadableStream,
isFirstTurn: boolean,
) {
if (isFirstTurn) {
await manager.register(streamId);
} else {
await manager.reopen(streamId);
}
manager.persist(chatStream, streamId, { strategy: 'immediate' });
return manager.watch(streamId);
}With Idempotent Registration
When you don't know whether it's the first turn, use register() first. If the stream already exists and is terminal, call reopen().
async function handleChatRequest(
streamId: string,
chatStream: ReadableStream,
) {
const { stream, created } = await manager.register(streamId);
if (!created) {
if (stream.status === 'queued' || stream.status === 'running') {
return manager.watch(streamId);
}
await manager.reopen(streamId);
}
manager.persist(chatStream, streamId, { strategy: 'immediate' });
return manager.watch(streamId);
}Best Practices
- Use
bufferedstrategy (the default) for throughput-sensitive workloads. Switch toimmediatewhen you cannot tolerate any chunk loss on crash. - Always call
cleanup()after you no longer need a stream. Chunks accumulate and are not automatically garbage-collected. - Tune
watchPolling/cancelPollingfor your latency and DB-load budget. Lower intervals reduce latency but increase polling cost. - Use a shared
SqliteStreamStoreinstance across your application to benefit from prepared statement caching. - Call
SqliteStreamStore.close()during shutdown to release SQLite resources cleanly.
Next Steps
- Storage - Store implementations for context persistence
- Checkpoints - Named restore points for conversation branching