Deep Agents
AgentContextOrchestratorRetrievalText2SQLToolbox

Stream Persistence

Durably store AI streaming responses for replay, reconnection, and multi-client watching

Stream persistence saves AI streaming chunks to a durable store as they arrive. If a client disconnects mid-stream, another client can pick up where it left off or replay from the beginning. Multiple clients can watch the same stream simultaneously.

How It Works

Client A starts stream:
  AI Model → chunks → StreamManager.persist() → StreamStore (SQLite)
                                               ↘ Client A (live)

Client A disconnects. Client B connects:
  StreamManager.watch(streamId) → polls StreamStore → Client B (replay + live tail)

Stream status lifecycle:
  queued → running → completed
                   → failed
                   → cancelled

Three layers handle this:

  • StreamStore — abstract storage interface for streams and chunks
  • StreamManager — high-level lifecycle: register, persist, watch, cancel, cleanup
  • persistedWriter — low-level writer wrapper that intercepts chunks and buffers them to the store

StreamStore

The abstract base class defining storage operations. SqliteStreamStore is the built-in implementation.

import { SqliteStreamStore } from '@deepagents/context';

const store = new SqliteStreamStore('./streams.db');

// Close when your process shuts down
store.close();

You can also pass an existing DatabaseSync instance:

import { DatabaseSync } from 'node:sqlite';

const db = new DatabaseSync('./app.db');
const store = new SqliteStreamStore(db);

If you pass your own DatabaseSync, call store.close() only if this stream store owns the connection lifecycle in your app.

StreamData

FieldTypeDescription
idstringUnique stream identifier
statusStreamStatusCurrent status: queued, running, completed, failed, cancelled
createdAtnumberTimestamp when the stream was created
startedAtnumber | nullTimestamp when the stream started running
finishedAtnumber | nullTimestamp when the stream reached a terminal status
cancelRequestedAtnumber | nullTimestamp when cancellation was requested
errorstring | nullError message if the stream failed

StreamChunkData

FieldTypeDescription
streamIdstringParent stream ID
seqnumberMonotonically increasing sequence number
dataunknownThe stream part (serialized as JSON in SQLite)
createdAtnumberTimestamp when the chunk was persisted

Methods

MethodDescription
createStream(stream)Insert a new stream record
upsertStream(stream)Insert or return existing stream. Returns { stream, created }
getStream(streamId)Fetch stream metadata, or undefined if not found
getStreamStatus(streamId)Fetch only the stream status, or undefined if not found
updateStreamStatus(streamId, status, opts?)Transition stream status. Automatically sets startedAt, finishedAt, cancelRequestedAt, and error based on the target status
appendChunks(chunks)Batch-insert chunks (transactional in SQLite)
getChunks(streamId, fromSeq?, limit?)Fetch chunks ordered by seq, optionally starting from fromSeq
deleteStream(streamId)Delete stream and all its chunks (cascading)
reopenStream(streamId)Atomically delete a terminal stream and re-create it as queued. Throws if the stream is not found or not in a terminal status

SqliteStreamStore.close()

SqliteStreamStore exposes an explicit close() method to release the underlying SQLite connection and clear prepared statements. It is idempotent.

const store = new SqliteStreamStore('./streams.db');
const manager = new StreamManager({ store });

// ... stream operations ...

// app shutdown
store.close();

StreamManager

Orchestrates the full stream lifecycle.

import { StreamManager, SqliteStreamStore } from '@deepagents/context';

const manager = new StreamManager({
  store: new SqliteStreamStore('./streams.db'),
  watchPolling: {
    minMs: 25,
    maxMs: 500,
    multiplier: 2,
    jitterRatio: 0.15,
    statusCheckEvery: 3,
    chunkPageSize: 128,
  },
  cancelPolling: {
    minMs: 50,
    maxMs: 500,
    multiplier: 2,
    jitterRatio: 0.15,
  },
});

register(streamId)

Creates or retrieves a stream in queued status. Call this before starting the AI generation.

const { stream, created } = await manager.register('stream-abc');

if (!created) {
  console.log(`Stream already exists with status: ${stream.status}`);
}

persist(stream, streamId, options?)

Consumes a ReadableStream, persists every chunk to the store, and transitions status from running to completed (or failed on error). Polls for cancellation in the background.

import { createUIMessageStream } from 'ai';

const uiStream = createUIMessageStream({
  execute: async ({ writer }) => {
    writer.write({ type: 'text-delta', textDelta: 'Hello' });
    writer.write({ type: 'text-delta', textDelta: ' world' });
  },
});

await manager.persist(uiStream, 'stream-abc', {
  strategy: 'buffered',
  flushSize: 20,
  cancelPolling: {
    minMs: 50,
    maxMs: 500,
    multiplier: 2,
    jitterRatio: 0.15,
  },
});
OptionTypeDefaultDescription
strategy'buffered' | 'immediate''buffered'buffered batches chunks for throughput; immediate persists each chunk before forwarding
flushSizenumber20For buffered strategy: flush after this many chunks
cancelPolling{ minMs, maxMs, multiplier, jitterRatio }{50, 500, 2, 0.15}Adaptive cancellation polling policy

If the stream is already in a terminal status (completed, failed, cancelled), persist returns immediately without re-running.

watch(streamId, options?)

Returns a ReadableStream that replays all persisted chunks, then live-tails new ones until the stream reaches a terminal status.

const replayStream = manager.watch('stream-abc', {
  minMs: 25,
  maxMs: 500,
  multiplier: 2,
  jitterRatio: 0.15,
  statusCheckEvery: 3,
  chunkPageSize: 128,
});

for await (const part of replayStream) {
  console.log(part);
}
OptionTypeDefaultDescription
minMsnumber25Minimum poll interval
maxMsnumber500Maximum poll interval
multipliernumber2Backoff multiplier when idle
jitterRationumber0.15Random jitter ratio applied per wait
statusCheckEverynumber3Check stream status every N chunk polls while data is flowing
chunkPageSizenumber128Maximum chunks fetched per poll iteration

Throws if the stream ID does not exist.

If a stream is deleted while being watched, the watcher closes gracefully.

Polling telemetry

Use onPollingEvent to observe polling behavior for tuning and diagnostics:

const manager = new StreamManager({
  store: new SqliteStreamStore('./streams.db'),
  onPollingEvent: (event) => {
    if (event.type === 'watch:empty') {
      console.debug('idle delay', event.delayMs);
    }
  },
});

Event types:

  • watch:poll
  • watch:empty
  • watch:chunks
  • watch:closed
  • persist:cancel-poll
  • persist:cancel-detected

cancel(streamId)

Marks the stream as cancelled. The persist() loop detects this on its next poll and stops consuming.

await manager.cancel('stream-abc');

cleanup(streamId)

Deletes the stream record and all associated chunks.

await manager.cleanup('stream-abc');

reopen(streamId)

Atomically clears a terminal stream (deletes old chunks and record) and re-registers it as queued. Use this to reuse a streamId for a new generation cycle — for example, when continuing a multi-turn conversation on the same stream.

const { stream, created } = await manager.reopen('stream-abc');
// stream.status === 'queued', created === true
// Old chunks are deleted — call persist() with a new ReadableStream

Throws if the stream does not exist or is not in a terminal status (completed, failed, cancelled). This prevents accidentally destroying an in-progress stream.

persistedWriter

A lower-level API that wraps a UIMessageStreamWriter to intercept and persist chunks as they flow through. Use this when you need direct control over the writer inside createUIMessageStream.

import { createUIMessageStream } from 'ai';
import { persistedWriter, SqliteStreamStore } from '@deepagents/context';

const store = new SqliteStreamStore('./streams.db');

const uiStream = createUIMessageStream({
  execute: async ({ writer }) => {
    const pw = await persistedWriter({
      writer,
      store,
      streamId: 'stream-abc',
      strategy: 'buffered',
      flushSize: 20,
    });

    pw.writer.write({ type: 'text-delta', textDelta: 'Hello' });
    pw.writer.write({ type: 'text-delta', textDelta: ' world' });

    await pw.complete();
  },
});

Options

OptionTypeDefaultDescription
writerUIMessageStreamWriterrequiredThe original writer to wrap
storeStreamStorerequiredWhere to persist chunks
streamIdstringrequiredStream identifier
strategy'buffered' | 'immediate''buffered'Persistence strategy
flushSizenumber20Chunks to buffer before flushing (buffered strategy only)

PersistedWriter Methods

MethodDescription
flush()Force-flush the buffer to the store
complete()Flush remaining chunks and mark stream as completed
fail(error?)Flush remaining chunks and mark stream as failed with optional error message
cleanup()Delete the stream and all chunks from the store

The returned writer property is a drop-in replacement for UIMessageStreamWriter. Both write() and merge() are intercepted: chunks are persisted before being forwarded to the original writer.

Reconnectable Stream Pattern

Register the stream, start persisting on the server, and serve watch() to any client that connects (or reconnects).

import { StreamManager, SqliteStreamStore } from '@deepagents/context';
import { streamText } from 'ai';
import { groq } from '@ai-sdk/groq';

const manager = new StreamManager({
  store: new SqliteStreamStore('./streams.db'),
});

async function startGeneration(streamId: string, prompt: string) {
  await manager.register(streamId);

  const { textStream } = streamText({
    model: groq('gpt-oss-20b'),
    prompt,
  });

  manager.persist(textStream, streamId);
}

function connectClient(streamId: string): ReadableStream {
  return manager.watch(streamId);
}

A client calling connectClient at any point receives all chunks produced so far, then continues to receive new ones in real time until the stream finishes.

Multi-Client Watching

Multiple clients can watch the same stream. Each watch() call creates an independent ReadableStream that polls the store separately.

const client1 = manager.watch('stream-abc');
const client2 = manager.watch('stream-abc');
const client3 = manager.watch('stream-abc');

Each client replays from the beginning and tails until the stream reaches a terminal status.

Stream Status Lifecycle

         register()

            v
        ┌────────┐
        │ queued  │ ◄──── reopen()
        └───┬────┘        (deletes old chunks,
            │  persist()   re-creates as queued)
            v                  │
        ┌─────────┐            │
        │ running  │           │
        └──┬──┬──┬─┘          │
           │  │  │             │
      ok   │  │  │  cancel()   │
           v  │  v             │
  ┌───────────┐ │ ┌───────────┐│
  │ completed │─┤ │ cancelled │┤
  └───────────┘ │ └───────────┘│
                v              │
          ┌────────┐           │
          │ failed │───────────┘
          └────────┘

Terminal statuses (completed, failed, cancelled) cause watch() to close after draining remaining chunks. Call reopen() on any terminal stream to reset it for a new generation cycle.

Custom StreamStore

Implement the abstract StreamStore class to back stream persistence with any database.

import { StreamStore } from '@deepagents/context';
import type { StreamData, StreamChunkData, StreamStatus } from '@deepagents/context';

class PostgresStreamStore extends StreamStore {
  async createStream(stream: StreamData): Promise<void> { /* ... */ }
  async upsertStream(stream: StreamData): Promise<{ stream: StreamData; created: boolean }> { /* ... */ }
  async getStream(streamId: string): Promise<StreamData | undefined> { /* ... */ }
  async getStreamStatus(streamId: string): Promise<StreamStatus | undefined> { /* ... */ }
  async updateStreamStatus(streamId: string, status: StreamStatus, options?: { error?: string }): Promise<void> { /* ... */ }
  async appendChunks(chunks: StreamChunkData[]): Promise<void> { /* ... */ }
  async getChunks(streamId: string, fromSeq?: number, limit?: number): Promise<StreamChunkData[]> { /* ... */ }
  async deleteStream(streamId: string): Promise<void> { /* ... */ }
  async reopenStream(streamId: string): Promise<StreamData> { /* ... */ }
}

Multi-Turn Conversation Flow

In a chat application, the agent may finish a turn by requesting user input — for example, asking the user to approve a SQL query before executing it. The stream completes, but the conversation continues. reopen() lets you reuse the same streamId for each turn without leaking old chunks.

Conversation history is stored separately in the context store (the message DAG). Stream chunks are ephemeral transport — they exist for SSE delivery and client reconnection during an active stream, not for permanent storage.

Turn 1: User asks a question
  register(streamId) → queued
  persist(chatStream, streamId) → running → completed
  Client watches → sees tool-call → shows approval UI

Turn 2: User approves (conversation continues)
  reopen(streamId) → deletes old chunks, re-queued
  persist(newChatStream, streamId) → running → completed
  Client re-watches → sees Turn 2 response

Turn N: Same pattern...

Server Example

import { StreamManager, SqliteStreamStore } from '@deepagents/context';

const manager = new StreamManager({
  store: new SqliteStreamStore('./streams.db'),
});

async function handleChatTurn(
  streamId: string,
  chatStream: ReadableStream,
  isFirstTurn: boolean,
) {
  if (isFirstTurn) {
    await manager.register(streamId);
  } else {
    await manager.reopen(streamId);
  }

  manager.persist(chatStream, streamId, { strategy: 'immediate' });

  return manager.watch(streamId);
}

With Idempotent Registration

When you don't know whether it's the first turn, use register() first. If the stream already exists and is terminal, call reopen().

async function handleChatRequest(
  streamId: string,
  chatStream: ReadableStream,
) {
  const { stream, created } = await manager.register(streamId);

  if (!created) {
    if (stream.status === 'queued' || stream.status === 'running') {
      return manager.watch(streamId);
    }
    await manager.reopen(streamId);
  }

  manager.persist(chatStream, streamId, { strategy: 'immediate' });
  return manager.watch(streamId);
}

Best Practices

  • Use buffered strategy (the default) for throughput-sensitive workloads. Switch to immediate when you cannot tolerate any chunk loss on crash.
  • Always call cleanup() after you no longer need a stream. Chunks accumulate and are not automatically garbage-collected.
  • Tune watchPolling/cancelPolling for your latency and DB-load budget. Lower intervals reduce latency but increase polling cost.
  • Use a shared SqliteStreamStore instance across your application to benefit from prepared statement caching.
  • Call SqliteStreamStore.close() during shutdown to release SQLite resources cleanly.

Next Steps

  • Storage - Store implementations for context persistence
  • Checkpoints - Named restore points for conversation branching