Deep Agents
AgentContextOrchestratorRetrievalText2SQLToolbox

Incremental Indexing Pipeline

Build a production indexing pipeline with different refresh strategies

Build a production indexing pipeline with different refresh strategies for different content types. This recipe shows how to efficiently manage large-scale content ingestion.

What You'll Build

  • Configurable ingestion pipeline
  • Different refresh strategies per source
  • Parallel processing for speed
  • Error handling and recovery
  • Pipeline orchestration

Prerequisites

npm install @deepagents/retrieval

Complete Implementation

import { ingest, fastembed, nodeSQLite } from '@deepagents/retrieval';
import { local, rss, github, pdf } from '@deepagents/retrieval/connectors';
import type { Connector } from '@deepagents/retrieval/connectors';

// Set up retrieval infrastructure
const store = nodeSQLite('./pipeline.db', 384);
const embedder = fastembed();

// Source configuration with refresh strategies
interface SourceConfig {
  name: string;
  connector: Connector;
  schedule: 'once' | 'always' | 'hourly' | 'daily';
  priority: 'high' | 'medium' | 'low';
  enabled: boolean;
}

const sources: SourceConfig[] = [
  // Static content: index once
  {
    name: 'Archives',
    connector: local('archives/**/*.md', { ingestWhen: 'never' }),
    schedule: 'once',
    priority: 'low',
    enabled: true,
  },

  // Active content: check every run
  {
    name: 'Documentation',
    connector: local('docs/**/*.md', { ingestWhen: 'contentChanged' }),
    schedule: 'always',
    priority: 'high',
    enabled: true,
  },

  // API reference: daily refresh
  {
    name: 'API Docs',
    connector: local('api-docs/**/*.md', { ingestWhen: 'contentChanged' }),
    schedule: 'daily',
    priority: 'medium',
    enabled: true,
  },

  // News feeds: hourly refresh
  {
    name: 'Tech News',
    connector: {
      ...rss('https://news.ycombinator.com/rss', { maxItems: 30 }),
      ingestWhen: 'expired',
      expiresAfter: 60 * 60 * 1000,
    },
    schedule: 'hourly',
    priority: 'medium',
    enabled: true,
  },

  // Release notes: daily refresh
  {
    name: 'Next.js Releases',
    connector: github.release('vercel/next.js'),
    schedule: 'daily',
    priority: 'low',
    enabled: true,
  },

  // Research papers: weekly refresh
  {
    name: 'Research Papers',
    connector: pdf('research/**/*.pdf'),
    schedule: 'daily',
    priority: 'low',
    enabled: true,
  },
];

// Pipeline state tracking
interface PipelineState {
  lastRun: Record<string, Date>;
  errors: Record<string, string>;
  stats: Record<string, { indexed: number; skipped: number; duration: number }>;
}

const state: PipelineState = {
  lastRun: {},
  errors: {},
  stats: {},
};

// Check if source should run based on schedule
function shouldRun(source: SourceConfig): boolean {
  if (!source.enabled) return false;

  const lastRun = state.lastRun[source.name];
  if (!lastRun) return true; // Never run before

  const now = Date.now();
  const elapsed = now - lastRun.getTime();

  switch (source.schedule) {
    case 'once':
      return false; // Already ran
    case 'always':
      return true;
    case 'hourly':
      return elapsed >= 60 * 60 * 1000;
    case 'daily':
      return elapsed >= 24 * 60 * 60 * 1000;
    default:
      return true;
  }
}

// Run ingestion for a single source
async function ingestSource(source: SourceConfig): Promise<boolean> {
  const start = Date.now();

  try {
    console.log(`  Indexing: ${source.name}...`);

    await ingest({
      connector: source.connector,
      store,
      embedder,
    });

    const duration = Date.now() - start;
    state.lastRun[source.name] = new Date();
    state.stats[source.name] = {
      indexed: 1, // Simplified; real impl would track actual counts
      skipped: 0,
      duration,
    };

    console.log(`  ✓ ${source.name} (${duration}ms)`);
    return true;
  } catch (error) {
    const message = error instanceof Error ? error.message : 'Unknown error';
    state.errors[source.name] = message;
    console.error(`  ✗ ${source.name}: ${message}`);
    return false;
  }
}

// Run the full pipeline
async function runPipeline(options: { force?: boolean; parallel?: boolean } = {}) {
  console.log('Starting indexing pipeline...\n');
  const pipelineStart = Date.now();

  // Filter sources that should run
  const toRun = sources.filter(s => options.force || shouldRun(s));

  if (toRun.length === 0) {
    console.log('No sources need updating.\n');
    return;
  }

  console.log(`Running ${toRun.length} sources:\n`);

  // Sort by priority
  const priorityOrder = { high: 0, medium: 1, low: 2 };
  toRun.sort((a, b) => priorityOrder[a.priority] - priorityOrder[b.priority]);

  let succeeded = 0;
  let failed = 0;

  if (options.parallel) {
    // Run in parallel (grouped by priority)
    const grouped = {
      high: toRun.filter(s => s.priority === 'high'),
      medium: toRun.filter(s => s.priority === 'medium'),
      low: toRun.filter(s => s.priority === 'low'),
    };

    for (const priority of ['high', 'medium', 'low'] as const) {
      const group = grouped[priority];
      if (group.length === 0) continue;

      console.log(`\n[${priority.toUpperCase()} priority]`);
      const results = await Promise.all(group.map(ingestSource));

      succeeded += results.filter(Boolean).length;
      failed += results.filter(r => !r).length;
    }
  } else {
    // Run sequentially
    for (const source of toRun) {
      const success = await ingestSource(source);
      if (success) succeeded++;
      else failed++;
    }
  }

  const duration = Date.now() - pipelineStart;
  console.log(`\nPipeline complete: ${succeeded} succeeded, ${failed} failed (${duration}ms)\n`);
}

// Get pipeline status
function getStatus(): string {
  let status = '\n=== Pipeline Status ===\n\n';

  for (const source of sources) {
    const lastRun = state.lastRun[source.name];
    const error = state.errors[source.name];
    const stats = state.stats[source.name];

    status += `${source.name}:\n`;
    status += `  Enabled: ${source.enabled}\n`;
    status += `  Schedule: ${source.schedule}\n`;
    status += `  Priority: ${source.priority}\n`;
    status += `  Last Run: ${lastRun?.toISOString() || 'Never'}\n`;
    status += `  Should Run: ${shouldRun(source)}\n`;

    if (error) {
      status += `  Error: ${error}\n`;
    }

    if (stats) {
      status += `  Duration: ${stats.duration}ms\n`;
    }

    status += '\n';
  }

  return status;
}

// CLI interface
async function main() {
  const args = process.argv.slice(2);
  const command = args[0] || 'run';

  switch (command) {
    case 'run':
      await runPipeline({ parallel: args.includes('--parallel') });
      break;

    case 'force':
      await runPipeline({ force: true, parallel: args.includes('--parallel') });
      break;

    case 'status':
      console.log(getStatus());
      break;

    case 'single':
      const sourceName = args[1];
      const source = sources.find(s => s.name === sourceName);
      if (source) {
        await ingestSource(source);
      } else {
        console.log(`Unknown source: ${sourceName}`);
        console.log(`Available: ${sources.map(s => s.name).join(', ')}`);
      }
      break;

    default:
      console.log(`
Usage:
  pipeline run [--parallel]     Run pipeline (respects schedules)
  pipeline force [--parallel]   Force run all sources
  pipeline status               Show pipeline status
  pipeline single <source>      Run a single source
      `);
  }
}

await main();

How It Works

1. Source Configuration

Define sources with schedules and priorities:

const sources: SourceConfig[] = [
  {
    name: 'Documentation',
    connector: local('docs/**/*.md', { ingestWhen: 'contentChanged' }),
    schedule: 'always',
    priority: 'high',
    enabled: true,
  },
  // ...
];

2. Schedule-Based Execution

Check if sources need to run:

function shouldRun(source: SourceConfig): boolean {
  const elapsed = Date.now() - state.lastRun[source.name].getTime();

  switch (source.schedule) {
    case 'hourly': return elapsed >= 60 * 60 * 1000;
    case 'daily': return elapsed >= 24 * 60 * 60 * 1000;
    // ...
  }
}

3. Priority Ordering

High priority sources run first:

toRun.sort((a, b) => priorityOrder[a.priority] - priorityOrder[b.priority]);

Customization Options

Cron Integration

Schedule with node-cron:

import cron from 'node-cron';

// Run high priority every 5 minutes
cron.schedule('*/5 * * * *', async () => {
  const highPriority = sources.filter(s => s.priority === 'high');
  for (const source of highPriority) {
    await ingestSource(source);
  }
});

// Run full pipeline hourly
cron.schedule('0 * * * *', async () => {
  await runPipeline({ parallel: true });
});

Webhook Triggers

Trigger on content changes:

import express from 'express';

const app = express();

app.post('/webhook/content-updated', async (req, res) => {
  const { sourceName } = req.body;
  const source = sources.find(s => s.name === sourceName);

  if (source) {
    await ingestSource(source);
    res.json({ status: 'ok', source: sourceName });
  } else {
    res.status(404).json({ error: 'Source not found' });
  }
});

Metrics and Monitoring

Track detailed metrics:

interface Metrics {
  totalRuns: number;
  successfulRuns: number;
  failedRuns: number;
  totalDocuments: number;
  averageDuration: number;
  lastError: string | null;
}

const metrics: Record<string, Metrics> = {};

function updateMetrics(source: SourceConfig, success: boolean, duration: number) {
  if (!metrics[source.name]) {
    metrics[source.name] = {
      totalRuns: 0,
      successfulRuns: 0,
      failedRuns: 0,
      totalDocuments: 0,
      averageDuration: 0,
      lastError: null,
    };
  }

  const m = metrics[source.name];
  m.totalRuns++;
  if (success) m.successfulRuns++;
  else m.failedRuns++;

  m.averageDuration = (m.averageDuration * (m.totalRuns - 1) + duration) / m.totalRuns;
}

Retry Logic

Add retry with exponential backoff:

async function ingestWithRetry(
  source: SourceConfig,
  maxRetries = 3
): Promise<boolean> {
  for (let attempt = 1; attempt <= maxRetries; attempt++) {
    try {
      await ingest({
        connector: source.connector,
        store,
        embedder,
      });
      return true;
    } catch (error) {
      console.log(`  Attempt ${attempt}/${maxRetries} failed`);

      if (attempt < maxRetries) {
        const delay = Math.pow(2, attempt) * 1000; // Exponential backoff
        await new Promise(resolve => setTimeout(resolve, delay));
      }
    }
  }

  return false;
}

Health Checks

Expose health endpoint:

function getHealthStatus(): {
  healthy: boolean;
  sources: Record<string, boolean>;
  lastCheck: Date;
} {
  const sourceHealth: Record<string, boolean> = {};

  for (const source of sources) {
    const hasError = !!state.errors[source.name];
    const isStale = !state.lastRun[source.name] ||
      (source.schedule === 'hourly' && Date.now() - state.lastRun[source.name].getTime() > 2 * 60 * 60 * 1000);

    sourceHealth[source.name] = !hasError && !isStale;
  }

  return {
    healthy: Object.values(sourceHealth).every(Boolean),
    sources: sourceHealth,
    lastCheck: new Date(),
  };
}

Production Tips

  1. State persistence: Store pipeline state in a database
  2. Distributed locking: Prevent concurrent runs in multi-instance setups
  3. Dead letter queue: Track and retry failed sources
  4. Alerting: Notify on failures or stale sources
  5. Resource limits: Cap memory usage during large ingestions

CLI Commands

# Run pipeline (respects schedules)
node pipeline.ts run

# Force run all sources
node pipeline.ts force

# Run in parallel mode
node pipeline.ts run --parallel

# Check status
node pipeline.ts status

# Run single source
node pipeline.ts single "Documentation"

Next Steps