Deep Agents
AgentContextOrchestratorRetrievalText2SQLToolbox

Custom Connectors

Build custom data source connectors for any content

Connectors define how content is pulled from data sources. You can build custom connectors to index any content type: databases, APIs, cloud storage, or proprietary formats.

Connector Interface

All connectors implement this interface:

type Connector = {
  /**
   * Unique identifier for the logical source (group of documents).
   * Used for caching and deduplication.
   */
  sourceId: string;

  /**
   * AsyncGenerator that yields documents to index.
   */
  sources: () => AsyncGenerator<
    {
      id: string;                              // Unique document identifier
      content: () => Promise<string>;          // Lazy content loader
      metadata?: Record<string, any>;          // Optional metadata
    },
    void,
    unknown
  >;

  /**
   * Controls ingestion behavior:
   * - 'never': index once, never re-index
   * - 'contentChanged': (default) re-index when content changes
   * - 'expired': re-index when TTL expires
   */
  ingestWhen?: 'never' | 'contentChanged' | 'expired';

  /**
   * TTL in milliseconds (for 'expired' mode)
   */
  expiresAfter?: number;
};

Basic Example

A minimal connector that yields a single document:

import type { Connector } from '@deepagents/retrieval/connectors';

function myConnector(content: string): Connector {
  return {
    sourceId: 'my-connector',
    sources: async function* () {
      yield {
        id: 'doc-1',
        content: async () => content,
      };
    },
  };
}

Real-World Examples

Database Connector

Index records from a database:

import type { Connector } from '@deepagents/retrieval/connectors';
import { Pool } from 'pg';

function postgresConnector(pool: Pool, table: string): Connector {
  const sourceId = `postgres:${table}`;

  return {
    sourceId,
    ingestWhen: 'contentChanged',
    sources: async function* () {
      const result = await pool.query(`SELECT * FROM ${table}`);

      for (const row of result.rows) {
        yield {
          id: `${table}:${row.id}`,
          content: async () => JSON.stringify(row),
          metadata: {
            table,
            id: row.id,
            created_at: row.created_at,
          },
        };
      }
    },
  };
}

// Usage
import { similaritySearch, fastembed, nodeSQLite } from '@deepagents/retrieval';

const pool = new Pool({ connectionString: process.env.DATABASE_URL });
const store = nodeSQLite('./db.db', 384);

const results = await similaritySearch('user preferences', {
  connector: postgresConnector(pool, 'users'),
  store,
  embedder: fastembed(),
});

REST API Connector

Index content from a REST API:

import type { Connector } from '@deepagents/retrieval/connectors';

interface Article {
  id: string;
  title: string;
  body: string;
  author: string;
  published_at: string;
}

function apiConnector(baseUrl: string): Connector {
  const sourceId = `api:${baseUrl}`;

  return {
    sourceId,
    ingestWhen: 'expired',
    expiresAfter: 60 * 60 * 1000, // 1 hour
    sources: async function* () {
      let page = 1;
      let hasMore = true;

      while (hasMore) {
        const response = await fetch(`${baseUrl}/articles?page=${page}`);
        const data: { articles: Article[]; has_more: boolean } = await response.json();

        for (const article of data.articles) {
          yield {
            id: article.id,
            content: async () => `
              Title: ${article.title}
              Author: ${article.author}
              Published: ${article.published_at}

              ${article.body}
            `,
            metadata: {
              title: article.title,
              author: article.author,
              published_at: article.published_at,
            },
          };
        }

        hasMore = data.has_more;
        page++;
      }
    },
  };
}

// Usage
import { similaritySearch, fastembed, nodeSQLite } from '@deepagents/retrieval';

const results = await similaritySearch('product launch', {
  connector: apiConnector('https://api.company.com'),
  store: nodeSQLite('./api.db', 384),
  embedder: fastembed(),
});

S3 Connector

Index files from Amazon S3:

import type { Connector } from '@deepagents/retrieval/connectors';
import { S3Client, ListObjectsV2Command, GetObjectCommand } from '@aws-sdk/client-s3';

function s3Connector(bucket: string, prefix: string): Connector {
  const s3 = new S3Client({});
  const sourceId = `s3:${bucket}/${prefix}`;

  return {
    sourceId,
    ingestWhen: 'contentChanged',
    sources: async function* () {
      let continuationToken: string | undefined;

      do {
        const listCommand = new ListObjectsV2Command({
          Bucket: bucket,
          Prefix: prefix,
          ContinuationToken: continuationToken,
        });

        const response = await s3.send(listCommand);

        for (const object of response.Contents || []) {
          if (!object.Key) continue;

          yield {
            id: object.Key,
            content: async () => {
              const getCommand = new GetObjectCommand({
                Bucket: bucket,
                Key: object.Key,
              });
              const data = await s3.send(getCommand);
              return await data.Body!.transformToString();
            },
            metadata: {
              bucket,
              key: object.Key,
              size: object.Size,
              lastModified: object.LastModified,
            },
          };
        }

        continuationToken = response.NextContinuationToken;
      } while (continuationToken);
    },
  };
}

// Usage
const results = await similaritySearch('deployment guide', {
  connector: s3Connector('my-bucket', 'docs/'),
  store: nodeSQLite('./s3.db', 384),
  embedder: fastembed(),
});

Notion Connector

Index pages from Notion:

import type { Connector } from '@deepagents/retrieval/connectors';
import { Client } from '@notionhq/client';

function notionConnector(apiKey: string, databaseId: string): Connector {
  const notion = new Client({ auth: apiKey });
  const sourceId = `notion:${databaseId}`;

  return {
    sourceId,
    ingestWhen: 'expired',
    expiresAfter: 30 * 60 * 1000, // 30 minutes
    sources: async function* () {
      let cursor: string | undefined;

      do {
        const response = await notion.databases.query({
          database_id: databaseId,
          start_cursor: cursor,
        });

        for (const page of response.results) {
          if (page.object !== 'page') continue;

          yield {
            id: page.id,
            content: async () => {
              // Fetch page content blocks
              const blocks = await notion.blocks.children.list({
                block_id: page.id,
              });

              // Extract text from blocks
              return blocks.results
                .map((block: any) => {
                  if (block.paragraph?.rich_text) {
                    return block.paragraph.rich_text
                      .map((t: any) => t.plain_text)
                      .join('');
                  }
                  return '';
                })
                .filter(Boolean)
                .join('\n\n');
            },
            metadata: {
              notionId: page.id,
              url: (page as any).url,
            },
          };
        }

        cursor = response.next_cursor ?? undefined;
      } while (cursor);
    },
  };
}

Slack Connector

Index messages from Slack channels:

import type { Connector } from '@deepagents/retrieval/connectors';
import { WebClient } from '@slack/web-api';

function slackConnector(token: string, channelId: string): Connector {
  const slack = new WebClient(token);
  const sourceId = `slack:${channelId}`;

  return {
    sourceId,
    ingestWhen: 'expired',
    expiresAfter: 15 * 60 * 1000, // 15 minutes
    sources: async function* () {
      let cursor: string | undefined;

      do {
        const response = await slack.conversations.history({
          channel: channelId,
          cursor,
          limit: 100,
        });

        for (const message of response.messages || []) {
          if (!message.text || message.subtype) continue;

          yield {
            id: message.ts!,
            content: async () => {
              // Fetch thread replies if any
              let content = message.text!;

              if (message.reply_count && message.reply_count > 0) {
                const replies = await slack.conversations.replies({
                  channel: channelId,
                  ts: message.ts!,
                });

                const replyTexts = (replies.messages || [])
                  .slice(1) // Skip parent message
                  .map(r => r.text)
                  .filter(Boolean);

                if (replyTexts.length > 0) {
                  content += '\n\nReplies:\n' + replyTexts.join('\n');
                }
              }

              return content;
            },
            metadata: {
              channel: channelId,
              timestamp: message.ts,
              user: message.user,
            },
          };
        }

        cursor = response.response_metadata?.next_cursor;
      } while (cursor);
    },
  };
}

Best Practices

1. Use Lazy Content Loading

Always use content: async () => ... instead of loading content upfront:

// Good: Lazy loading
yield {
  id: 'doc-1',
  content: async () => await fetchContent(id),
};

// Bad: Eager loading (loads all content into memory)
const content = await fetchContent(id);
yield {
  id: 'doc-1',
  content: async () => content,
};

2. Paginate Large Datasets

Use cursor-based or offset pagination for large datasets:

sources: async function* () {
  let page = 0;
  let hasMore = true;

  while (hasMore) {
    const items = await fetchPage(page);
    for (const item of items) {
      yield item;
    }
    hasMore = items.length > 0;
    page++;
  }
}

3. Handle Errors Gracefully

Catch and log errors for individual documents:

yield {
  id: doc.id,
  content: async () => {
    try {
      return await fetchContent(doc.id);
    } catch (error) {
      console.error(`Failed to fetch ${doc.id}:`, error);
      return ''; // Empty content is skipped by ingestion
    }
  },
};

4. Include Useful Metadata

Metadata is searchable and returned with results:

yield {
  id: doc.id,
  content: async () => doc.body,
  metadata: {
    title: doc.title,
    author: doc.author,
    url: doc.url,
    tags: doc.tags,
    created_at: doc.created_at,
  },
};

5. Choose Appropriate Ingestion Mode

  • contentChanged: For content you control
  • expired: For external APIs with rate limits
  • never: For static archives

Next Steps