---
title: "Building a Document Ingestion Pipeline for RAG: PDF, DOCX, HTML, and CSV Processing"
description: "Learn how to build a production document ingestion pipeline that detects file formats, extracts text, chunks content intelligently, generates embeddings, and stores everything for retrieval-augmented generation."
canonical: https://callsphere.ai/blog/building-document-ingestion-pipeline-rag-pdf-docx-html-csv
category: "Learn Agentic AI"
tags: ["RAG", "Document Processing", "Data Pipelines", "Embeddings", "Vector Search"]
author: "CallSphere Team"
published: 2026-03-17T00:00:00.000Z
updated: 2026-05-06T01:02:44.588Z
---

# Building a Document Ingestion Pipeline for RAG: PDF, DOCX, HTML, and CSV Processing

> Learn how to build a production document ingestion pipeline that detects file formats, extracts text, chunks content intelligently, generates embeddings, and stores everything for retrieval-augmented generation.

## Why Document Ingestion Is the Foundation of RAG

Retrieval-augmented generation only works if the retrieval layer has clean, well-structured data to search. Most RAG failures are not prompt engineering problems — they are data ingestion problems. If your pipeline silently drops tables from PDFs, strips formatting from DOCX headers, or produces overlapping chunks with no context, your agent will hallucinate confidently from incomplete information.

A production ingestion pipeline must handle four concerns: format detection and extraction, intelligent chunking, embedding generation, and indexed storage. Each stage has pitfalls that compound downstream.

## Format Detection and Text Extraction

The first challenge is reliably extracting text from heterogeneous file types. Never rely on file extensions alone — a renamed `.txt` file might contain HTML.

```mermaid
flowchart LR
    Q(["User query"])
    EMB["Embed query
text-embedding-3"]
    VEC[("Vector DB
pgvector or Pinecone")]
    RET["Top-k retrieval
k = 8"]
    PROMPT["Augmented prompt
system plus context"]
    LLM["LLM generation
Claude or GPT"]
    CITE["Inline citations
and page anchors"]
    OUT(["Grounded answer"])
    Q --> EMB --> VEC --> RET --> PROMPT --> LLM --> CITE --> OUT
    style EMB fill:#ede9fe,stroke:#7c3aed,color:#1e1b4b
    style VEC fill:#ede9fe,stroke:#7c3aed,color:#1e1b4b
    style LLM fill:#4f46e5,stroke:#4338ca,color:#fff
    style OUT fill:#059669,stroke:#047857,color:#fff
```

```python
import magic
from pathlib import Path
from dataclasses import dataclass
from typing import List

@dataclass
class ExtractedDocument:
    source: str
    content: str
    metadata: dict
    pages: List[str]

class FormatDetector:
    MIME_MAP = {
        "application/pdf": "pdf",
        "application/vnd.openxmlformats-officedocument"
        ".wordprocessingml.document": "docx",
        "text/html": "html",
        "text/csv": "csv",
        "text/plain": "text",
    }

    def detect(self, file_path: str) -> str:
        mime = magic.from_file(file_path, mime=True)
        fmt = self.MIME_MAP.get(mime)
        if not fmt:
            raise ValueError(
                f"Unsupported format: {mime} for {file_path}"
            )
        return fmt

class DocumentExtractor:
    def __init__(self):
        self.detector = FormatDetector()

    def extract(self, file_path: str) -> ExtractedDocument:
        fmt = self.detector.detect(file_path)
        extractor = getattr(self, f"_extract_{fmt}")
        return extractor(file_path)

    def _extract_pdf(self, path: str) -> ExtractedDocument:
        import pdfplumber
        pages = []
        with pdfplumber.open(path) as pdf:
            for page in pdf.pages:
                text = page.extract_text() or ""
                tables = page.extract_tables()
                for table in tables:
                    rows = [
                        " | ".join(str(c or "") for c in row)
                        for row in table
                    ]
                    text += "\n" + "\n".join(rows)
                pages.append(text)
        return ExtractedDocument(
            source=path,
            content="\n\n".join(pages),
            metadata={"format": "pdf", "page_count": len(pages)},
            pages=pages,
        )

    def _extract_docx(self, path: str) -> ExtractedDocument:
        from docx import Document
        doc = Document(path)
        paragraphs = [p.text for p in doc.paragraphs if p.text.strip()]
        return ExtractedDocument(
            source=path,
            content="\n\n".join(paragraphs),
            metadata={"format": "docx", "paragraph_count": len(paragraphs)},
            pages=paragraphs,
        )

    def _extract_html(self, path: str) -> ExtractedDocument:
        from bs4 import BeautifulSoup
        with open(path, "r", encoding="utf-8") as f:
            soup = BeautifulSoup(f.read(), "html.parser")
        for tag in soup(["script", "style", "nav", "footer"]):
            tag.decompose()
        text = soup.get_text(separator="\n", strip=True)
        return ExtractedDocument(
            source=path,
            content=text,
            metadata={"format": "html", "title": soup.title.string if soup.title else ""},
            pages=[text],
        )

    def _extract_csv(self, path: str) -> ExtractedDocument:
        import csv
        rows = []
        with open(path, "r", encoding="utf-8") as f:
            reader = csv.DictReader(f)
            for row in reader:
                line = " | ".join(
                    f"{k}: {v}" for k, v in row.items()
                )
                rows.append(line)
        return ExtractedDocument(
            source=path,
            content="\n".join(rows),
            metadata={"format": "csv", "row_count": len(rows)},
            pages=rows,
        )
```

The key design decision here is using `pdfplumber` over PyPDF2 because it handles table extraction natively. Tables are a major source of lost information in PDF pipelines.

## Intelligent Chunking

Naive fixed-size chunking breaks sentences mid-thought and loses section context. A better approach uses recursive splitting with overlap and respects document structure.

```python
from typing import List
from dataclasses import dataclass

@dataclass
class Chunk:
    text: str
    metadata: dict
    index: int

class RecursiveChunker:
    def __init__(
        self,
        max_tokens: int = 512,
        overlap_tokens: int = 64,
        separators: list = None,
    ):
        self.max_tokens = max_tokens
        self.overlap_tokens = overlap_tokens
        self.separators = separators or [
            "\n\n", "\n", ". ", " "
        ]

    def chunk(
        self, doc: ExtractedDocument
    ) -> List[Chunk]:
        raw_chunks = self._split(
            doc.content, self.separators
        )
        chunks = []
        for i, text in enumerate(raw_chunks):
            chunks.append(Chunk(
                text=text.strip(),
                metadata={
                    **doc.metadata,
                    "source": doc.source,
                    "chunk_index": i,
                    "total_chunks": len(raw_chunks),
                },
                index=i,
            ))
        return chunks

    def _split(self, text: str, seps: list) -> List[str]:
        if not seps:
            return self._fixed_split(text)
        sep = seps[0]
        parts = text.split(sep)
        merged = []
        current = ""
        for part in parts:
            candidate = current + sep + part if current else part
            if self._token_count(candidate)  self.max_tokens:
                    merged.extend(self._split(part, seps[1:]))
                else:
                    current = part
                    continue
                current = ""
        if current:
            merged.append(current)
        return self._add_overlap(merged)

    def _add_overlap(self, chunks: List[str]) -> List[str]:
        if len(chunks)  List[str]:
        words = text.split()
        return [
            " ".join(words[i:i + self.max_tokens])
            for i in range(0, len(words), self.max_tokens)
        ]

    def _token_count(self, text: str) -> int:
        return len(text.split())
```

## Embedding and Storage

Once chunks are ready, generate embeddings and store them in a vector database. Batch processing with rate limiting prevents API throttling.

```python
import asyncio
from openai import AsyncOpenAI

client = AsyncOpenAI()

async def embed_and_store(chunks: List[Chunk], collection):
    batch_size = 100
    for i in range(0, len(chunks), batch_size):
        batch = chunks[i:i + batch_size]
        response = await client.embeddings.create(
            model="text-embedding-3-small",
            input=[c.text for c in batch],
        )
        ids = [f"{batch[j].metadata['source']}_{batch[j].index}" for j in range(len(batch))]
        embeddings = [e.embedding for e in response.data]
        metadatas = [c.metadata for c in batch]
        documents = [c.text for c in batch]
        collection.upsert(
            ids=ids,
            embeddings=embeddings,
            metadatas=metadatas,
            documents=documents,
        )
        await asyncio.sleep(0.5)  # rate limiting
```

## FAQ

### How should I handle scanned PDFs with no extractable text?

Use OCR as a fallback. Check if `pdfplumber` returns empty text for a page, then run that page through `pytesseract` or a cloud OCR service like AWS Textract. Add an `ocr_applied: true` flag to chunk metadata so downstream consumers know the text quality may be lower.

### What chunk size works best for RAG?

Start with 512 tokens with 64-token overlap. Smaller chunks (256 tokens) improve precision for factual Q&A but lose context for summarization tasks. Larger chunks (1024 tokens) work better for complex reasoning. Test with your actual queries and measure retrieval recall to find the right size for your use case.

### Should I re-embed everything when the embedding model changes?

Yes. Embedding spaces are model-specific and not interchangeable. When you upgrade models, re-process all documents and rebuild your vector index. Use a versioned collection naming scheme like `docs_v2_embedding3small` so you can run both indexes in parallel during migration.

---

#RAG #DocumentProcessing #DataPipelines #Embeddings #VectorSearch #AgenticAI #LearnAI #AIEngineering

---

Source: https://callsphere.ai/blog/building-document-ingestion-pipeline-rag-pdf-docx-html-csv
