---
title: "Real-Time Semantic Search: Streaming Updates and Incremental Indexing"
description: "Build a semantic search system that handles live document updates with queue-based ingestion, incremental vector indexing, and near-real-time search freshness without rebuilding the entire index."
canonical: https://callsphere.ai/blog/real-time-semantic-search-streaming-updates-incremental-indexing
category: "Learn Agentic AI"
tags: ["Real-Time Search", "Incremental Indexing", "Streaming", "Queue Processing", "Vector Search"]
author: "CallSphere Team"
published: 2026-03-17T00:00:00.000Z
updated: 2026-06-04T20:26:42.354Z
---

# Real-Time Semantic Search: Streaming Updates and Incremental Indexing

> Build a semantic search system that handles live document updates with queue-based ingestion, incremental vector indexing, and near-real-time search freshness without rebuilding the entire index.

## The Freshness Challenge

Most semantic search tutorials assume a static corpus: embed all documents once, build an index, and search. But real-world applications deal with continuously changing data — new articles published every minute, products added and removed, user-generated content streaming in. Rebuilding the entire index on every change is not feasible at scale. You need incremental indexing that processes updates in near-real-time while keeping search results fresh.

## Architecture for Real-Time Semantic Search

The architecture has three layers:

```mermaid
flowchart TD
    DOC(["Document"])
    CHUNK["Chunker
recursive plus overlap"]
    EMB["Embedding model"]
    META["Attach metadata
source, page, tenant"]
    INDEX[("HNSW or IVF index
in vector store")]
    Q(["Query"])
    QEMB["Embed query"]
    SEARCH["ANN search
cosine similarity"]
    FILTER["Metadata filter
tenant or date"]
    HITS(["Top-k chunks"])
    DOC --> CHUNK --> EMB --> META --> INDEX
    Q --> QEMB --> SEARCH
    INDEX --> SEARCH --> FILTER --> HITS
    style INDEX fill:#4f46e5,stroke:#4338ca,color:#fff
    style HITS fill:#059669,stroke:#047857,color:#fff
```

1. **Change capture** — detect document creates, updates, and deletes.
2. **Embedding queue** — buffer changes and process embeddings asynchronously.
3. **Live index** — update the vector index incrementally without downtime.

```python
from dataclasses import dataclass
from enum import Enum
from typing import Optional
import time

class ChangeType(Enum):
    CREATE = "create"
    UPDATE = "update"
    DELETE = "delete"

@dataclass
class DocumentChange:
    doc_id: str
    change_type: ChangeType
    title: Optional[str] = None
    body: Optional[str] = None
    timestamp: float = 0.0

    def __post_init__(self):
        if self.timestamp == 0.0:
            self.timestamp = time.time()
```

## Queue-Based Ingestion Pipeline

We use an async queue to decouple document changes from the embedding computation. This ensures the write path (document updates) is never blocked by the slow embedding step.

```python
import asyncio
from collections import deque
from sentence_transformers import SentenceTransformer
import numpy as np
import logging

logger = logging.getLogger(__name__)

class EmbeddingQueue:
    def __init__(
        self,
        model_name: str = "all-MiniLM-L6-v2",
        batch_size: int = 32,
        flush_interval: float = 1.0,
    ):
        self.model = SentenceTransformer(model_name)
        self.queue: asyncio.Queue = asyncio.Queue()
        self.batch_size = batch_size
        self.flush_interval = flush_interval
        self._running = False

    async def enqueue(self, change: DocumentChange):
        """Add a document change to the processing queue."""
        await self.queue.put(change)

    async def start_worker(self, index: "LiveVectorIndex"):
        """Process queued changes in batches."""
        self._running = True
        while self._running:
            batch = []
            try:
                # Collect items up to batch_size or flush_interval
                deadline = time.time() + self.flush_interval
                while len(batch)  index position
        self.position_to_id: dict = {}  # index position -> doc_id
        self.metadata: dict = {}  # doc_id -> metadata
        self.deleted: set = set()  # positions marked as deleted
        self._lock = threading.Lock()
        self._total_added = 0

    def upsert(self, doc_id: str, embedding: np.ndarray, meta: dict):
        """Insert or update a document in the index."""
        with self._lock:
            if doc_id in self.id_to_position:
                old_pos = self.id_to_position[doc_id]
                self.deleted.add(old_pos)

            position = self._total_added
            self.index.add(embedding.reshape(1, -1))
            self.id_to_position[doc_id] = position
            self.position_to_id[position] = doc_id
            self.metadata[doc_id] = meta
            self._total_added += 1

    def delete(self, doc_id: str):
        """Mark a document as deleted."""
        with self._lock:
            if doc_id in self.id_to_position:
                position = self.id_to_position[doc_id]
                self.deleted.add(position)
                del self.id_to_position[doc_id]
                del self.position_to_id[position]
                self.metadata.pop(doc_id, None)

    def search(
        self, query_embedding: np.ndarray, top_k: int = 10
    ) -> list:
        """Search with deleted document filtering."""
        with self._lock:
            fetch_k = top_k + len(self.deleted)
            scores, positions = self.index.search(
                query_embedding.reshape(1, -1),
                min(fetch_k, self.index.ntotal),
            )

            results = []
            for score, pos in zip(scores[0], positions[0]):
                if pos == -1 or pos in self.deleted:
                    continue
                doc_id = self.position_to_id.get(pos)
                if doc_id is None:
                    continue
                results.append({
                    "doc_id": doc_id,
                    "score": float(score),
                    **self.metadata.get(doc_id, {}),
                })
                if len(results) >= top_k:
                    break
            return results

    def compact(self):
        """Rebuild index without deleted entries to reclaim space."""
        with self._lock:
            active_ids = list(self.id_to_position.keys())
            active_positions = [
                self.id_to_position[did] for did in active_ids
            ]

            vectors = np.array([
                self.index.reconstruct(pos) for pos in active_positions
            ])

            self.index = faiss.IndexFlatIP(self.dimension)
            self.index.add(vectors)

            self.id_to_position = {}
            self.position_to_id = {}
            self.deleted = set()

            for i, doc_id in enumerate(active_ids):
                self.id_to_position[doc_id] = i
                self.position_to_id[i] = doc_id

            self._total_added = len(active_ids)
            logger.info(f"Compacted index to {len(active_ids)} vectors")
```

## Putting It Together

```python
async def main():
    index = LiveVectorIndex(dimension=384)
    queue = EmbeddingQueue(batch_size=16, flush_interval=0.5)

    worker_task = asyncio.create_task(queue.start_worker(index))

    # Simulate incoming document changes
    await queue.enqueue(DocumentChange(
        doc_id="article-1",
        change_type=ChangeType.CREATE,
        title="Introduction to Vector Databases",
        body="Vector databases store high-dimensional embeddings...",
    ))
    await queue.enqueue(DocumentChange(
        doc_id="article-2",
        change_type=ChangeType.CREATE,
        title="FAISS Performance Tuning",
        body="Optimize FAISS indexes for production workloads...",
    ))

    await asyncio.sleep(2)  # wait for processing

    # Search the live index
    model = SentenceTransformer("all-MiniLM-L6-v2")
    query_emb = model.encode(
        ["how to optimize vector search"], normalize_embeddings=True
    )
    results = index.search(query_emb)
    for r in results:
        print(f"{r['score']:.3f} — {r.get('title', 'N/A')}")

    queue.stop()
```

## When to Compact

The deleted set grows over time as documents are updated or removed. Schedule compaction during low-traffic periods or when the deleted ratio exceeds a threshold.

```python
def should_compact(index: LiveVectorIndex, threshold: float = 0.3) -> bool:
    """Compact when deleted entries exceed threshold of total."""
    total = index.index.ntotal
    if total == 0:
        return False
    deleted_ratio = len(index.deleted) / total
    return deleted_ratio > threshold
```

## FAQ

### What is the typical delay between a document being added and it appearing in search results?

With the queue-based architecture shown here, the delay is roughly the flush interval (default 1 second) plus embedding computation time (50-200ms per document depending on hardware). For most applications, a 1-2 second delay between write and searchability is acceptable. If you need sub-second freshness, reduce the flush interval and use GPU-accelerated embedding.

### How do I handle consistency between the source database and the search index?

Use a write-ahead pattern: write the document to your primary database first, then enqueue the change for indexing. If the embedding worker crashes, replay unprocessed changes from the database changelog. For stronger guarantees, use a change data capture (CDC) tool like Debezium that streams database changes to your embedding queue.

### When should I rebuild the entire index vs using incremental updates?

Rebuild when changing embedding models (old vectors are incompatible with new ones), after a major data migration, or when the deleted ratio exceeds 50% and compaction alone is insufficient. For day-to-day operations, incremental updates with periodic compaction are more efficient and avoid downtime.

---

#RealTimeSearch #IncrementalIndexing #Streaming #QueueProcessing #VectorSearch #AgenticAI #LearnAI #AIEngineering

---

Source: https://callsphere.ai/blog/real-time-semantic-search-streaming-updates-incremental-indexing
