---
title: "Building an Embedding Pipeline: Batch Processing Millions of Documents for Vector Search"
description: "Learn how to build a scalable embedding pipeline that processes millions of documents with parallelization, rate limiting, progress tracking, and incremental updates for production vector search."
canonical: https://callsphere.ai/blog/building-embedding-pipeline-batch-processing-millions-documents-vector-search
category: "Learn Agentic AI"
tags: ["Embeddings", "Vector Search", "Batch Processing", "Data Pipelines", "Scalability"]
author: "CallSphere Team"
published: 2026-03-17T00:00:00.000Z
updated: 2026-05-25T03:24:33.375Z
---

# Building an Embedding Pipeline: Batch Processing Millions of Documents for Vector Search

> Learn how to build a scalable embedding pipeline that processes millions of documents with parallelization, rate limiting, progress tracking, and incremental updates for production vector search.

## The Challenge of Embedding at Scale

Generating embeddings for a hundred documents is trivial. Generating embeddings for a million documents introduces a different class of problems: API rate limits, network failures mid-batch, cost optimization, memory management, and the need to incrementally update without re-processing everything.

A naive loop that sends one document at a time to the embedding API would take days for a million documents. A production pipeline parallelizes requests, batches efficiently, tracks progress for resumability, and only re-embeds documents that have actually changed.

## Pipeline Architecture

The pipeline has four components: a document source that yields unprocessed records, a batcher that groups documents for efficient API calls, an embedder that handles rate limiting and retries, and a writer that stores results in the vector database.

```mermaid
flowchart TD
    DOC(["Document"])
    CHUNK["Chunker
recursive plus overlap"]
    EMB["Embedding model"]
    META["Attach metadata
source, page, tenant"]
    INDEX[("HNSW or IVF index
in vector store")]
    Q(["Query"])
    QEMB["Embed query"]
    SEARCH["ANN search
cosine similarity"]
    FILTER["Metadata filter
tenant or date"]
    HITS(["Top-k chunks"])
    DOC --> CHUNK --> EMB --> META --> INDEX
    Q --> QEMB --> SEARCH
    INDEX --> SEARCH --> FILTER --> HITS
    style INDEX fill:#4f46e5,stroke:#4338ca,color:#fff
    style HITS fill:#059669,stroke:#047857,color:#fff
```

```python
from dataclasses import dataclass, field
from typing import List, Optional, AsyncIterator
from datetime import datetime
import hashlib

@dataclass
class Document:
    id: str
    text: str
    metadata: dict
    content_hash: str = ""

    def __post_init__(self):
        if not self.content_hash:
            self.content_hash = hashlib.sha256(
                self.text.encode()
            ).hexdigest()

@dataclass
class EmbeddedDocument:
    id: str
    text: str
    embedding: List[float]
    metadata: dict
    content_hash: str

@dataclass
class PipelineStats:
    total: int = 0
    processed: int = 0
    skipped: int = 0
    failed: int = 0
    started_at: Optional[datetime] = None

    @property
    def progress_pct(self) -> float:
        if self.total == 0:
            return 0.0
        return (self.processed + self.skipped) / self.total * 100

    @property
    def rate(self) -> float:
        if not self.started_at:
            return 0.0
        elapsed = (datetime.utcnow() - self.started_at).total_seconds()
        return self.processed / max(elapsed, 1)
```

## Incremental Processing with Content Hashing

The single biggest optimization is skipping documents that have not changed. Store a content hash alongside each embedding and compare before re-processing.

```python
class IncrementalSource:
    def __init__(self, db_pool, vector_store):
        self.db_pool = db_pool
        self.vector_store = vector_store

    async def get_documents(self) -> AsyncIterator[Document]:
        async with self.db_pool.acquire() as conn:
            rows = await conn.fetch(
                "SELECT id, content, metadata FROM documents"
            )

        existing_hashes = await self.vector_store.get_hashes(
            [row["id"] for row in rows]
        )

        for row in rows:
            doc = Document(
                id=row["id"],
                text=row["content"],
                metadata=dict(row["metadata"]),
            )
            if existing_hashes.get(doc.id) == doc.content_hash:
                continue  # content unchanged, skip
            yield doc
```

## Rate-Limited Parallel Embedder

The embedder sends batched requests with concurrency control and exponential backoff on rate limit errors.

```python
import asyncio
from openai import AsyncOpenAI, RateLimitError
import logging

logger = logging.getLogger(__name__)

class BatchEmbedder:
    def __init__(
        self,
        model: str = "text-embedding-3-small",
        batch_size: int = 100,
        max_concurrent: int = 5,
        max_retries: int = 5,
    ):
        self.client = AsyncOpenAI()
        self.model = model
        self.batch_size = batch_size
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.max_retries = max_retries

    async def embed_batch(
        self, docs: List[Document]
    ) -> List[EmbeddedDocument]:
        async with self.semaphore:
            for attempt in range(self.max_retries):
                try:
                    response = await self.client.embeddings.create(
                        model=self.model,
                        input=[d.text[:8191] for d in docs],
                    )
                    return [
                        EmbeddedDocument(
                            id=docs[i].id,
                            text=docs[i].text,
                            embedding=response.data[i].embedding,
                            metadata=docs[i].metadata,
                            content_hash=docs[i].content_hash,
                        )
                        for i in range(len(docs))
                    ]
                except RateLimitError:
                    wait = 2 ** attempt
                    logger.warning(
                        f"Rate limited, retrying in {wait}s "
                        f"(attempt {attempt + 1})"
                    )
                    await asyncio.sleep(wait)
                except Exception as e:
                    logger.error(f"Embedding failed: {e}")
                    raise
            raise RuntimeError(
                f"Failed after {self.max_retries} retries"
            )
```

## Progress Tracking and Resumability

For million-document pipelines, crashes are inevitable. A checkpoint system lets you resume from where you left off.

```python
import json
from pathlib import Path

class CheckpointManager:
    def __init__(self, checkpoint_path: str = "embed_checkpoint.json"):
        self.path = Path(checkpoint_path)
        self.state = self._load()

    def _load(self) -> dict:
        if self.path.exists():
            return json.loads(self.path.read_text())
        return {"processed_ids": [], "stats": {}}

    def save(self, stats: PipelineStats, batch_ids: List[str]):
        self.state["processed_ids"].extend(batch_ids)
        self.state["stats"] = {
            "total": stats.total,
            "processed": stats.processed,
            "skipped": stats.skipped,
            "failed": stats.failed,
        }
        self.path.write_text(json.dumps(self.state))

    def is_processed(self, doc_id: str) -> bool:
        return doc_id in set(self.state["processed_ids"])
```

## Orchestrating the Full Pipeline

Tie all components together with an orchestrator that coordinates batching, embedding, and writing.

```python
async def run_pipeline(source, embedder, vector_store, checkpoint):
    stats = PipelineStats(started_at=datetime.utcnow())
    batch = []

    async for doc in source.get_documents():
        stats.total += 1
        if checkpoint.is_processed(doc.id):
            stats.skipped += 1
            continue

        batch.append(doc)
        if len(batch) >= embedder.batch_size:
            results = await embedder.embed_batch(batch)
            await vector_store.upsert_batch(results)
            checkpoint.save(stats, [d.id for d in batch])
            stats.processed += len(results)
            batch = []

            if stats.processed % 1000 == 0:
                logger.info(
                    f"Progress: {stats.progress_pct:.1f}% "
                    f"({stats.processed}/{stats.total}) "
                    f"Rate: {stats.rate:.1f} docs/sec"
                )

    # Process remaining
    if batch:
        results = await embedder.embed_batch(batch)
        await vector_store.upsert_batch(results)
        checkpoint.save(stats, [d.id for d in batch])
        stats.processed += len(results)

    logger.info(f"Pipeline complete: {stats.processed} embedded, "
                f"{stats.skipped} skipped, {stats.failed} failed")
```

## FAQ

### How much does it cost to embed a million documents?

With OpenAI's text-embedding-3-small at approximately $0.02 per million tokens, a million documents averaging 500 tokens each costs around $10. The larger text-embedding-3-large model costs roughly $0.13 per million tokens. These costs make re-embedding feasible when you upgrade models, but incremental processing still saves significant time and API calls.

### Should I use a local embedding model instead of an API?

For datasets under 100,000 documents, API-based embeddings are simpler and produce excellent quality. For larger datasets or when you need to avoid sending data to external services, local models like `sentence-transformers` running on GPU are more cost-effective. A single A100 GPU can embed roughly 10,000 documents per second with a local model.

### How do I handle documents that exceed the embedding model's token limit?

Truncation is the simplest approach — the code above clips to 8191 tokens. A better approach is chunking long documents before embedding and storing multiple vectors per document with shared metadata. At query time, retrieve chunks and group them by document ID to reconstruct context.

---

#Embeddings #VectorSearch #BatchProcessing #DataPipelines #Scalability #AgenticAI #LearnAI #AIEngineering

---

Source: https://callsphere.ai/blog/building-embedding-pipeline-batch-processing-millions-documents-vector-search
