Incremental Indexing Pipeline
Build a production indexing pipeline with different refresh strategies
Build a production indexing pipeline with different refresh strategies for different content types. This recipe shows how to efficiently manage large-scale content ingestion.
What You'll Build
- Configurable ingestion pipeline
- Different refresh strategies per source
- Parallel processing for speed
- Error handling and recovery
- Pipeline orchestration
Prerequisites
npm install @deepagents/retrievalComplete Implementation
import { ingest, fastembed, nodeSQLite } from '@deepagents/retrieval';
import { local, rss, github, pdf } from '@deepagents/retrieval/connectors';
import type { Connector } from '@deepagents/retrieval/connectors';
// Set up retrieval infrastructure
const store = nodeSQLite('./pipeline.db', 384);
const embedder = fastembed();
// Source configuration with refresh strategies
interface SourceConfig {
name: string;
connector: Connector;
schedule: 'once' | 'always' | 'hourly' | 'daily';
priority: 'high' | 'medium' | 'low';
enabled: boolean;
}
const sources: SourceConfig[] = [
// Static content: index once
{
name: 'Archives',
connector: local('archives/**/*.md', { ingestWhen: 'never' }),
schedule: 'once',
priority: 'low',
enabled: true,
},
// Active content: check every run
{
name: 'Documentation',
connector: local('docs/**/*.md', { ingestWhen: 'contentChanged' }),
schedule: 'always',
priority: 'high',
enabled: true,
},
// API reference: daily refresh
{
name: 'API Docs',
connector: local('api-docs/**/*.md', { ingestWhen: 'contentChanged' }),
schedule: 'daily',
priority: 'medium',
enabled: true,
},
// News feeds: hourly refresh
{
name: 'Tech News',
connector: {
...rss('https://news.ycombinator.com/rss', { maxItems: 30 }),
ingestWhen: 'expired',
expiresAfter: 60 * 60 * 1000,
},
schedule: 'hourly',
priority: 'medium',
enabled: true,
},
// Release notes: daily refresh
{
name: 'Next.js Releases',
connector: github.release('vercel/next.js'),
schedule: 'daily',
priority: 'low',
enabled: true,
},
// Research papers: weekly refresh
{
name: 'Research Papers',
connector: pdf('research/**/*.pdf'),
schedule: 'daily',
priority: 'low',
enabled: true,
},
];
// Pipeline state tracking
interface PipelineState {
lastRun: Record<string, Date>;
errors: Record<string, string>;
stats: Record<string, { indexed: number; skipped: number; duration: number }>;
}
const state: PipelineState = {
lastRun: {},
errors: {},
stats: {},
};
// Check if source should run based on schedule
function shouldRun(source: SourceConfig): boolean {
if (!source.enabled) return false;
const lastRun = state.lastRun[source.name];
if (!lastRun) return true; // Never run before
const now = Date.now();
const elapsed = now - lastRun.getTime();
switch (source.schedule) {
case 'once':
return false; // Already ran
case 'always':
return true;
case 'hourly':
return elapsed >= 60 * 60 * 1000;
case 'daily':
return elapsed >= 24 * 60 * 60 * 1000;
default:
return true;
}
}
// Run ingestion for a single source
async function ingestSource(source: SourceConfig): Promise<boolean> {
const start = Date.now();
try {
console.log(` Indexing: ${source.name}...`);
await ingest({
connector: source.connector,
store,
embedder,
});
const duration = Date.now() - start;
state.lastRun[source.name] = new Date();
state.stats[source.name] = {
indexed: 1, // Simplified; real impl would track actual counts
skipped: 0,
duration,
};
console.log(` ✓ ${source.name} (${duration}ms)`);
return true;
} catch (error) {
const message = error instanceof Error ? error.message : 'Unknown error';
state.errors[source.name] = message;
console.error(` ✗ ${source.name}: ${message}`);
return false;
}
}
// Run the full pipeline
async function runPipeline(options: { force?: boolean; parallel?: boolean } = {}) {
console.log('Starting indexing pipeline...\n');
const pipelineStart = Date.now();
// Filter sources that should run
const toRun = sources.filter(s => options.force || shouldRun(s));
if (toRun.length === 0) {
console.log('No sources need updating.\n');
return;
}
console.log(`Running ${toRun.length} sources:\n`);
// Sort by priority
const priorityOrder = { high: 0, medium: 1, low: 2 };
toRun.sort((a, b) => priorityOrder[a.priority] - priorityOrder[b.priority]);
let succeeded = 0;
let failed = 0;
if (options.parallel) {
// Run in parallel (grouped by priority)
const grouped = {
high: toRun.filter(s => s.priority === 'high'),
medium: toRun.filter(s => s.priority === 'medium'),
low: toRun.filter(s => s.priority === 'low'),
};
for (const priority of ['high', 'medium', 'low'] as const) {
const group = grouped[priority];
if (group.length === 0) continue;
console.log(`\n[${priority.toUpperCase()} priority]`);
const results = await Promise.all(group.map(ingestSource));
succeeded += results.filter(Boolean).length;
failed += results.filter(r => !r).length;
}
} else {
// Run sequentially
for (const source of toRun) {
const success = await ingestSource(source);
if (success) succeeded++;
else failed++;
}
}
const duration = Date.now() - pipelineStart;
console.log(`\nPipeline complete: ${succeeded} succeeded, ${failed} failed (${duration}ms)\n`);
}
// Get pipeline status
function getStatus(): string {
let status = '\n=== Pipeline Status ===\n\n';
for (const source of sources) {
const lastRun = state.lastRun[source.name];
const error = state.errors[source.name];
const stats = state.stats[source.name];
status += `${source.name}:\n`;
status += ` Enabled: ${source.enabled}\n`;
status += ` Schedule: ${source.schedule}\n`;
status += ` Priority: ${source.priority}\n`;
status += ` Last Run: ${lastRun?.toISOString() || 'Never'}\n`;
status += ` Should Run: ${shouldRun(source)}\n`;
if (error) {
status += ` Error: ${error}\n`;
}
if (stats) {
status += ` Duration: ${stats.duration}ms\n`;
}
status += '\n';
}
return status;
}
// CLI interface
async function main() {
const args = process.argv.slice(2);
const command = args[0] || 'run';
switch (command) {
case 'run':
await runPipeline({ parallel: args.includes('--parallel') });
break;
case 'force':
await runPipeline({ force: true, parallel: args.includes('--parallel') });
break;
case 'status':
console.log(getStatus());
break;
case 'single':
const sourceName = args[1];
const source = sources.find(s => s.name === sourceName);
if (source) {
await ingestSource(source);
} else {
console.log(`Unknown source: ${sourceName}`);
console.log(`Available: ${sources.map(s => s.name).join(', ')}`);
}
break;
default:
console.log(`
Usage:
pipeline run [--parallel] Run pipeline (respects schedules)
pipeline force [--parallel] Force run all sources
pipeline status Show pipeline status
pipeline single <source> Run a single source
`);
}
}
await main();How It Works
1. Source Configuration
Define sources with schedules and priorities:
const sources: SourceConfig[] = [
{
name: 'Documentation',
connector: local('docs/**/*.md', { ingestWhen: 'contentChanged' }),
schedule: 'always',
priority: 'high',
enabled: true,
},
// ...
];2. Schedule-Based Execution
Check if sources need to run:
function shouldRun(source: SourceConfig): boolean {
const elapsed = Date.now() - state.lastRun[source.name].getTime();
switch (source.schedule) {
case 'hourly': return elapsed >= 60 * 60 * 1000;
case 'daily': return elapsed >= 24 * 60 * 60 * 1000;
// ...
}
}3. Priority Ordering
High priority sources run first:
toRun.sort((a, b) => priorityOrder[a.priority] - priorityOrder[b.priority]);Customization Options
Cron Integration
Schedule with node-cron:
import cron from 'node-cron';
// Run high priority every 5 minutes
cron.schedule('*/5 * * * *', async () => {
const highPriority = sources.filter(s => s.priority === 'high');
for (const source of highPriority) {
await ingestSource(source);
}
});
// Run full pipeline hourly
cron.schedule('0 * * * *', async () => {
await runPipeline({ parallel: true });
});Webhook Triggers
Trigger on content changes:
import express from 'express';
const app = express();
app.post('/webhook/content-updated', async (req, res) => {
const { sourceName } = req.body;
const source = sources.find(s => s.name === sourceName);
if (source) {
await ingestSource(source);
res.json({ status: 'ok', source: sourceName });
} else {
res.status(404).json({ error: 'Source not found' });
}
});Metrics and Monitoring
Track detailed metrics:
interface Metrics {
totalRuns: number;
successfulRuns: number;
failedRuns: number;
totalDocuments: number;
averageDuration: number;
lastError: string | null;
}
const metrics: Record<string, Metrics> = {};
function updateMetrics(source: SourceConfig, success: boolean, duration: number) {
if (!metrics[source.name]) {
metrics[source.name] = {
totalRuns: 0,
successfulRuns: 0,
failedRuns: 0,
totalDocuments: 0,
averageDuration: 0,
lastError: null,
};
}
const m = metrics[source.name];
m.totalRuns++;
if (success) m.successfulRuns++;
else m.failedRuns++;
m.averageDuration = (m.averageDuration * (m.totalRuns - 1) + duration) / m.totalRuns;
}Retry Logic
Add retry with exponential backoff:
async function ingestWithRetry(
source: SourceConfig,
maxRetries = 3
): Promise<boolean> {
for (let attempt = 1; attempt <= maxRetries; attempt++) {
try {
await ingest({
connector: source.connector,
store,
embedder,
});
return true;
} catch (error) {
console.log(` Attempt ${attempt}/${maxRetries} failed`);
if (attempt < maxRetries) {
const delay = Math.pow(2, attempt) * 1000; // Exponential backoff
await new Promise(resolve => setTimeout(resolve, delay));
}
}
}
return false;
}Health Checks
Expose health endpoint:
function getHealthStatus(): {
healthy: boolean;
sources: Record<string, boolean>;
lastCheck: Date;
} {
const sourceHealth: Record<string, boolean> = {};
for (const source of sources) {
const hasError = !!state.errors[source.name];
const isStale = !state.lastRun[source.name] ||
(source.schedule === 'hourly' && Date.now() - state.lastRun[source.name].getTime() > 2 * 60 * 60 * 1000);
sourceHealth[source.name] = !hasError && !isStale;
}
return {
healthy: Object.values(sourceHealth).every(Boolean),
sources: sourceHealth,
lastCheck: new Date(),
};
}Production Tips
- State persistence: Store pipeline state in a database
- Distributed locking: Prevent concurrent runs in multi-instance setups
- Dead letter queue: Track and retry failed sources
- Alerting: Notify on failures or stale sources
- Resource limits: Cap memory usage during large ingestions
CLI Commands
# Run pipeline (respects schedules)
node pipeline.ts run
# Force run all sources
node pipeline.ts force
# Run in parallel mode
node pipeline.ts run --parallel
# Check status
node pipeline.ts status
# Run single source
node pipeline.ts single "Documentation"Next Steps
- Documentation Chatbot - Use indexed content
- Ingestion Modes - Refresh strategies
- Custom Connectors - Add new sources