Custom Connectors
Build custom data source connectors for any content
Connectors define how content is pulled from data sources. You can build custom connectors to index any content type: databases, APIs, cloud storage, or proprietary formats.
Connector Interface
All connectors implement this interface:
type Connector = {
/**
* Unique identifier for the logical source (group of documents).
* Used for caching and deduplication.
*/
sourceId: string;
/**
* AsyncGenerator that yields documents to index.
*/
sources: () => AsyncGenerator<
{
id: string; // Unique document identifier
content: () => Promise<string>; // Lazy content loader
metadata?: Record<string, any>; // Optional metadata
},
void,
unknown
>;
/**
* Controls ingestion behavior:
* - 'never': index once, never re-index
* - 'contentChanged': (default) re-index when content changes
* - 'expired': re-index when TTL expires
*/
ingestWhen?: 'never' | 'contentChanged' | 'expired';
/**
* TTL in milliseconds (for 'expired' mode)
*/
expiresAfter?: number;
};Basic Example
A minimal connector that yields a single document:
import type { Connector } from '@deepagents/retrieval/connectors';
function myConnector(content: string): Connector {
return {
sourceId: 'my-connector',
sources: async function* () {
yield {
id: 'doc-1',
content: async () => content,
};
},
};
}Real-World Examples
Database Connector
Index records from a database:
import type { Connector } from '@deepagents/retrieval/connectors';
import { Pool } from 'pg';
function postgresConnector(pool: Pool, table: string): Connector {
const sourceId = `postgres:${table}`;
return {
sourceId,
ingestWhen: 'contentChanged',
sources: async function* () {
const result = await pool.query(`SELECT * FROM ${table}`);
for (const row of result.rows) {
yield {
id: `${table}:${row.id}`,
content: async () => JSON.stringify(row),
metadata: {
table,
id: row.id,
created_at: row.created_at,
},
};
}
},
};
}
// Usage
import { similaritySearch, fastembed, nodeSQLite } from '@deepagents/retrieval';
const pool = new Pool({ connectionString: process.env.DATABASE_URL });
const store = nodeSQLite('./db.db', 384);
const results = await similaritySearch('user preferences', {
connector: postgresConnector(pool, 'users'),
store,
embedder: fastembed(),
});REST API Connector
Index content from a REST API:
import type { Connector } from '@deepagents/retrieval/connectors';
interface Article {
id: string;
title: string;
body: string;
author: string;
published_at: string;
}
function apiConnector(baseUrl: string): Connector {
const sourceId = `api:${baseUrl}`;
return {
sourceId,
ingestWhen: 'expired',
expiresAfter: 60 * 60 * 1000, // 1 hour
sources: async function* () {
let page = 1;
let hasMore = true;
while (hasMore) {
const response = await fetch(`${baseUrl}/articles?page=${page}`);
const data: { articles: Article[]; has_more: boolean } = await response.json();
for (const article of data.articles) {
yield {
id: article.id,
content: async () => `
Title: ${article.title}
Author: ${article.author}
Published: ${article.published_at}
${article.body}
`,
metadata: {
title: article.title,
author: article.author,
published_at: article.published_at,
},
};
}
hasMore = data.has_more;
page++;
}
},
};
}
// Usage
import { similaritySearch, fastembed, nodeSQLite } from '@deepagents/retrieval';
const results = await similaritySearch('product launch', {
connector: apiConnector('https://api.company.com'),
store: nodeSQLite('./api.db', 384),
embedder: fastembed(),
});S3 Connector
Index files from Amazon S3:
import type { Connector } from '@deepagents/retrieval/connectors';
import { S3Client, ListObjectsV2Command, GetObjectCommand } from '@aws-sdk/client-s3';
function s3Connector(bucket: string, prefix: string): Connector {
const s3 = new S3Client({});
const sourceId = `s3:${bucket}/${prefix}`;
return {
sourceId,
ingestWhen: 'contentChanged',
sources: async function* () {
let continuationToken: string | undefined;
do {
const listCommand = new ListObjectsV2Command({
Bucket: bucket,
Prefix: prefix,
ContinuationToken: continuationToken,
});
const response = await s3.send(listCommand);
for (const object of response.Contents || []) {
if (!object.Key) continue;
yield {
id: object.Key,
content: async () => {
const getCommand = new GetObjectCommand({
Bucket: bucket,
Key: object.Key,
});
const data = await s3.send(getCommand);
return await data.Body!.transformToString();
},
metadata: {
bucket,
key: object.Key,
size: object.Size,
lastModified: object.LastModified,
},
};
}
continuationToken = response.NextContinuationToken;
} while (continuationToken);
},
};
}
// Usage
const results = await similaritySearch('deployment guide', {
connector: s3Connector('my-bucket', 'docs/'),
store: nodeSQLite('./s3.db', 384),
embedder: fastembed(),
});Notion Connector
Index pages from Notion:
import type { Connector } from '@deepagents/retrieval/connectors';
import { Client } from '@notionhq/client';
function notionConnector(apiKey: string, databaseId: string): Connector {
const notion = new Client({ auth: apiKey });
const sourceId = `notion:${databaseId}`;
return {
sourceId,
ingestWhen: 'expired',
expiresAfter: 30 * 60 * 1000, // 30 minutes
sources: async function* () {
let cursor: string | undefined;
do {
const response = await notion.databases.query({
database_id: databaseId,
start_cursor: cursor,
});
for (const page of response.results) {
if (page.object !== 'page') continue;
yield {
id: page.id,
content: async () => {
// Fetch page content blocks
const blocks = await notion.blocks.children.list({
block_id: page.id,
});
// Extract text from blocks
return blocks.results
.map((block: any) => {
if (block.paragraph?.rich_text) {
return block.paragraph.rich_text
.map((t: any) => t.plain_text)
.join('');
}
return '';
})
.filter(Boolean)
.join('\n\n');
},
metadata: {
notionId: page.id,
url: (page as any).url,
},
};
}
cursor = response.next_cursor ?? undefined;
} while (cursor);
},
};
}Slack Connector
Index messages from Slack channels:
import type { Connector } from '@deepagents/retrieval/connectors';
import { WebClient } from '@slack/web-api';
function slackConnector(token: string, channelId: string): Connector {
const slack = new WebClient(token);
const sourceId = `slack:${channelId}`;
return {
sourceId,
ingestWhen: 'expired',
expiresAfter: 15 * 60 * 1000, // 15 minutes
sources: async function* () {
let cursor: string | undefined;
do {
const response = await slack.conversations.history({
channel: channelId,
cursor,
limit: 100,
});
for (const message of response.messages || []) {
if (!message.text || message.subtype) continue;
yield {
id: message.ts!,
content: async () => {
// Fetch thread replies if any
let content = message.text!;
if (message.reply_count && message.reply_count > 0) {
const replies = await slack.conversations.replies({
channel: channelId,
ts: message.ts!,
});
const replyTexts = (replies.messages || [])
.slice(1) // Skip parent message
.map(r => r.text)
.filter(Boolean);
if (replyTexts.length > 0) {
content += '\n\nReplies:\n' + replyTexts.join('\n');
}
}
return content;
},
metadata: {
channel: channelId,
timestamp: message.ts,
user: message.user,
},
};
}
cursor = response.response_metadata?.next_cursor;
} while (cursor);
},
};
}Best Practices
1. Use Lazy Content Loading
Always use content: async () => ... instead of loading content upfront:
// Good: Lazy loading
yield {
id: 'doc-1',
content: async () => await fetchContent(id),
};
// Bad: Eager loading (loads all content into memory)
const content = await fetchContent(id);
yield {
id: 'doc-1',
content: async () => content,
};2. Paginate Large Datasets
Use cursor-based or offset pagination for large datasets:
sources: async function* () {
let page = 0;
let hasMore = true;
while (hasMore) {
const items = await fetchPage(page);
for (const item of items) {
yield item;
}
hasMore = items.length > 0;
page++;
}
}3. Handle Errors Gracefully
Catch and log errors for individual documents:
yield {
id: doc.id,
content: async () => {
try {
return await fetchContent(doc.id);
} catch (error) {
console.error(`Failed to fetch ${doc.id}:`, error);
return ''; // Empty content is skipped by ingestion
}
},
};4. Include Useful Metadata
Metadata is searchable and returned with results:
yield {
id: doc.id,
content: async () => doc.body,
metadata: {
title: doc.title,
author: doc.author,
url: doc.url,
tags: doc.tags,
created_at: doc.created_at,
},
};5. Choose Appropriate Ingestion Mode
contentChanged: For content you controlexpired: For external APIs with rate limitsnever: For static archives
Next Steps
- Ingestion Modes - Control re-indexing
- Stores - Configure vector storage
- Recipes - Complete examples