---
title: "Batch Embedding and Ingestion: Processing Millions of Documents for Vector Search"
description: "Build a production-grade pipeline for embedding and ingesting millions of documents into a vector database, covering chunking strategies, parallel processing, rate limiting, and progress tracking."
canonical: https://callsphere.ai/blog/batch-embedding-ingestion-millions-documents-vector-search
category: "Learn Agentic AI"
tags: ["Batch Processing", "Embeddings", "Data Ingestion", "Pipeline", "Python"]
author: "CallSphere Team"
published: 2026-03-17T00:00:00.000Z
updated: 2026-05-07T03:59:26.899Z
---

# Batch Embedding and Ingestion: Processing Millions of Documents for Vector Search

> Build a production-grade pipeline for embedding and ingesting millions of documents into a vector database, covering chunking strategies, parallel processing, rate limiting, and progress tracking.

## The Ingestion Challenge

Generating a single embedding takes milliseconds. Generating embeddings for a million documents takes hours if you do it naively — one document at a time, waiting for each API response before sending the next request. A well-designed ingestion pipeline uses batching, parallelism, and fault tolerance to reduce that to minutes.

This guide walks through building a production-grade pipeline that chunks documents, generates embeddings in parallel batches, handles rate limits gracefully, and tracks progress so you can resume after failures.

## Step 1: Document Chunking

Most documents exceed the token limit of embedding models (8191 tokens for OpenAI `text-embedding-3-small`). Split them into overlapping chunks to preserve context at boundaries:

```mermaid
flowchart TD
    DOC(["Document"])
    CHUNK["Chunker
recursive plus overlap"]
    EMB["Embedding model"]
    META["Attach metadata
source, page, tenant"]
    INDEX[("HNSW or IVF index
in vector store")]
    Q(["Query"])
    QEMB["Embed query"]
    SEARCH["ANN search
cosine similarity"]
    FILTER["Metadata filter
tenant or date"]
    HITS(["Top-k chunks"])
    DOC --> CHUNK --> EMB --> META --> INDEX
    Q --> QEMB --> SEARCH
    INDEX --> SEARCH --> FILTER --> HITS
    style INDEX fill:#4f46e5,stroke:#4338ca,color:#fff
    style HITS fill:#059669,stroke:#047857,color:#fff
```

```python
from dataclasses import dataclass

@dataclass
class Chunk:
    doc_id: str
    chunk_index: int
    text: str
    metadata: dict

def chunk_document(
    doc_id: str,
    text: str,
    metadata: dict,
    chunk_size: int = 500,
    overlap: int = 50
) -> list[Chunk]:
    words = text.split()
    chunks = []
    start = 0
    index = 0

    while start  list[list[float]]:
    for attempt in range(max_retries):
        try:
            response = client.embeddings.create(
                model=model,
                input=texts
            )
            return [item.embedding for item in response.data]
        except openai.RateLimitError:
            wait_time = 2 ** attempt * 5  # 5s, 10s, 20s
            print(f"Rate limited. Waiting {wait_time}s...")
            time.sleep(wait_time)
        except openai.APIError as e:
            if attempt == max_retries - 1:
                raise
            print(f"API error: {e}. Retrying...")
            time.sleep(2)
    raise RuntimeError("Max retries exceeded")
```

## Step 3: Parallel Processing Pipeline

Use a producer-consumer pattern with a thread pool. The producer chunks documents and fills a queue. Workers pull batches from the queue, embed them, and upsert to the vector database:

```python
import concurrent.futures
from queue import Queue
from threading import Lock

class EmbeddingPipeline:
    def __init__(self, batch_size: int = 100, max_workers: int = 5):
        self.batch_size = batch_size
        self.max_workers = max_workers
        self.progress = {"embedded": 0, "failed": 0}
        self.lock = Lock()

    def process_batch(self, chunks: list[Chunk]) -> list[dict]:
        texts = [c.text for c in chunks]
        embeddings = embed_batch(texts)

        results = []
        for chunk, embedding in zip(chunks, embeddings):
            results.append({
                "id": f"{chunk.doc_id}_{chunk.chunk_index}",
                "values": embedding,
                "metadata": {
                    **chunk.metadata,
                    "doc_id": chunk.doc_id,
                    "text": chunk.text[:500]  # store truncated text
                }
            })

        with self.lock:
            self.progress["embedded"] += len(results)

        return results

    def run(self, chunks: list[Chunk]):
        # Split into batches
        batches = [
            chunks[i:i + self.batch_size]
            for i in range(0, len(chunks), self.batch_size)
        ]

        all_results = []
        with concurrent.futures.ThreadPoolExecutor(
            max_workers=self.max_workers
        ) as executor:
            futures = {
                executor.submit(self.process_batch, batch): i
                for i, batch in enumerate(batches)
            }

            for future in concurrent.futures.as_completed(futures):
                batch_idx = futures[future]
                try:
                    results = future.result()
                    all_results.extend(results)
                    print(
                        f"Batch {batch_idx + 1}/{len(batches)} done. "
                        f"Total: {self.progress['embedded']}"
                    )
                except Exception as e:
                    with self.lock:
                        self.progress["failed"] += self.batch_size
                    print(f"Batch {batch_idx} failed: {e}")

        return all_results
```

## Step 4: Progress Tracking and Resumability

For million-document pipelines, failures are inevitable. Track progress to enable resuming:

```python
import json
from pathlib import Path

class ProgressTracker:
    def __init__(self, checkpoint_path: str = "ingestion_progress.json"):
        self.path = Path(checkpoint_path)
        self.processed_ids: set[str] = set()
        self._load()

    def _load(self):
        if self.path.exists():
            data = json.loads(self.path.read_text())
            self.processed_ids = set(data.get("processed_ids", []))
            print(f"Resumed: {len(self.processed_ids)} already processed")

    def mark_done(self, doc_ids: list[str]):
        self.processed_ids.update(doc_ids)
        # Save checkpoint every 1000 documents
        if len(self.processed_ids) % 1000 == 0:
            self._save()

    def _save(self):
        self.path.write_text(json.dumps({
            "processed_ids": list(self.processed_ids),
            "count": len(self.processed_ids)
        }))

    def should_process(self, doc_id: str) -> bool:
        return doc_id not in self.processed_ids
```

## Step 5: Putting It All Together

```python
def ingest_documents(documents: list[dict]):
    tracker = ProgressTracker()
    pipeline = EmbeddingPipeline(batch_size=100, max_workers=5)

    # Filter already-processed documents
    pending = [
        doc for doc in documents
        if tracker.should_process(doc["id"])
    ]
    print(f"Processing {len(pending)} of {len(documents)} documents")

    # Chunk all pending documents
    all_chunks = []
    for doc in pending:
        chunks = chunk_document(
            doc_id=doc["id"],
            text=doc["content"],
            metadata={"source": doc.get("source", "unknown")}
        )
        all_chunks.append((doc["id"], chunks))

    # Flatten chunks for pipeline
    flat_chunks = [c for _, chunks in all_chunks for c in chunks]

    # Embed and collect results
    results = pipeline.run(flat_chunks)

    # Upsert to vector database in batches
    for i in range(0, len(results), 100):
        batch = results[i:i + 100]
        index.upsert(vectors=batch)  # Pinecone example
        doc_ids = list(set(r["metadata"]["doc_id"] for r in batch))
        tracker.mark_done(doc_ids)

    print(f"Done. Embedded: {pipeline.progress['embedded']}, "
          f"Failed: {pipeline.progress['failed']}")
```

## Performance Tips

- **Use the largest batch size your API allows.** OpenAI supports up to 2048 inputs per request. Larger batches reduce HTTP overhead.
- **Set max_workers to 3-5.** More workers hit rate limits faster. Fewer workers leave throughput on the table.
- **Pre-filter empty or duplicate documents** before chunking to avoid wasting embedding API calls.
- **Monitor token usage.** Embedding cost is per token, not per request. Long documents cost more.

## FAQ

### How long does it take to embed one million documents?

With OpenAI `text-embedding-3-small` at 5 parallel workers and 100 documents per batch, expect roughly 2-4 hours for one million average-length documents (500 words each). The bottleneck is API rate limits, not compute. Local embedding models like sentence-transformers on a GPU can process the same volume in 30-60 minutes.

### Should I embed entire documents or chunks?

Almost always chunks. Embedding models have token limits, and embedding a long document into a single vector loses fine-grained information. Chunking lets you retrieve the specific paragraph that answers a question rather than the entire document. Store the document ID in chunk metadata to reconstruct the full document when needed.

### How do I handle documents that change after initial ingestion?

Track document versions or content hashes. When a document changes, re-chunk and re-embed it, then upsert the new chunks (overwriting by ID). Delete orphaned chunk IDs that no longer correspond to any chunk in the updated document. Most vector databases support upsert natively, making this straightforward.

---

#BatchProcessing #Embeddings #DataIngestion #Pipeline #Python #AgenticAI #LearnAI #AIEngineering

---

Source: https://callsphere.ai/blog/batch-embedding-ingestion-millions-documents-vector-search
