---
title: "Postgres Logical Replication for AI ETL: Stream OLTP to Your Feature Store (2026)"
description: "Stop running brittle nightly batches. Postgres logical replication gives you sub-second CDC into a warehouse, feature store, or vector index. A working publication, slot management, and a Supabase ETL Rust example."
canonical: https://callsphere.ai/blog/vw7h-postgres-logical-replication-ai-etl-2026
category: "AI Infrastructure"
tags: ["Postgres", "Logical Replication", "ETL", "CDC", "AI Pipelines"]
author: "CallSphere Team"
published: 2026-03-23T00:00:00.000Z
updated: 2026-05-07T22:22:40.807Z
---

# Postgres Logical Replication for AI ETL: Stream OLTP to Your Feature Store (2026)

> Stop running brittle nightly batches. Postgres logical replication gives you sub-second CDC into a warehouse, feature store, or vector index. A working publication, slot management, and a Supabase ETL Rust example.

> **TL;DR** — Logical replication turns Postgres into a CDC source. AI teams in 2026 use it to stream OLTP changes into pgvector, Iceberg, or feature stores in  OLTP[(Primary Postgres)]
  OLTP --> WAL[WAL stream]
  WAL --> SLOT[Replication slot]
  SLOT --> SUB[Subscriber]
  SUB --> XFORM[Transformer
+ embed]
  XFORM --> ANALYTICS[(Analytics + pgvector)]
  ANALYTICS --> AGENTS[AI agents]
```

## Step 1 — Prepare the primary

```bash
# postgresql.conf
wal_level = logical
max_replication_slots = 20
max_wal_senders = 20

# pg_hba.conf
host replication replicator 10.0.0.0/8 scram-sha-256
```

```sql
CREATE ROLE replicator WITH REPLICATION LOGIN PASSWORD '...';
GRANT SELECT ON calls, messages, agent_runs TO replicator;
```

## Step 2 — Create the publication

```sql
CREATE PUBLICATION ai_pipeline FOR TABLE calls, messages, agent_runs
WITH (publish = 'insert,update,delete');
```

For column-level filtering (Postgres 15+):

```sql
CREATE PUBLICATION ai_pipeline FOR TABLE
  calls (id, tenant_id, transcript, ended_at)
WITH (publish_via_partition_root = true);
```

## Step 3 — Create the subscriber

```sql
CREATE SUBSCRIPTION ai_pipeline_sub
CONNECTION 'host=primary.internal user=replicator dbname=app password=...'
PUBLICATION ai_pipeline
WITH (copy_data = true, streaming = on, two_phase = on);
```

## Step 4 — Transform and embed downstream

```python
# Run on the subscriber, polling the new rows
import psycopg, openai, os

oai = openai.OpenAI()
sub = psycopg.connect(os.environ["SUB_DSN"])

def enrich():
    with sub.cursor() as cur:
        cur.execute("""
            SELECT id, transcript FROM calls
            WHERE embedding IS NULL AND transcript IS NOT NULL
            LIMIT 100
        """)
        rows = cur.fetchall()
    for cid, txt in rows:
        v = oai.embeddings.create(
            model="text-embedding-3-small", input=txt[:8000]
        ).data[0].embedding
        with sub.cursor() as cur:
            cur.execute(
                "UPDATE calls SET embedding = %s::vector WHERE id = %s",
                (v, cid),
            )
    sub.commit()
```

Run on a 30-second cron loop or trigger via LISTEN/NOTIFY for sub-second latency.

## Step 5 — Monitor lag

```sql
-- On the primary
SELECT slot_name, active, restart_lsn,
       pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) AS lag_bytes
FROM pg_replication_slots;

-- On the subscriber
SELECT subname, received_lsn, latest_end_lsn,
       latest_end_time, last_msg_receipt_time
FROM pg_stat_subscription;
```

## Step 6 — Use Supabase ETL for fan-out

For non-Postgres destinations (BigQuery, Iceberg, S3 Parquet) drop in [Supabase ETL](https://github.com/supabase/etl) — Rust building blocks on top of `pgoutput` that handle backfill, restart, and parallel apply.

```bash
cargo install supabase-etl
supabase-etl run --source ai_pipeline --sink bigquery://prod
```

## Pitfalls

- **Replication slots leak WAL** — a paused subscriber keeps WAL on disk forever. Monitor `pg_replication_slots.active` and disk usage daily.
- **No DDL replication** — schema changes must be applied to the subscriber first.
- **Large transactions stall** — use `streaming = on` (Postgres 14+) to apply in chunks.
- **Forgetting REPLICA IDENTITY** — UPDATE/DELETE on tables without primary keys silently breaks. Set `REPLICA IDENTITY FULL` for those.

## CallSphere production note

CallSphere streams OLTP changes from `call_logs` and `messages` tables into a vector-enriched analytics replica via logical replication. **115+ DB tables** stay write-fast on the primary; embeddings, summaries, and analytics aggregates are computed on the subscriber. Healthcare uses HIPAA-isolated `healthcare_voice` Prisma + dedicated subscriber; OneRoof preserves RLS via per-tenant publications; UrackIT mirrors to Supabase + ChromaDB. **37 agents · 90+ tools · 6 verticals**. Plans: $149/$499/$1,499 — 14-day trial, 22% affiliate.

## FAQ

**Q: Logical replication or trigger-based CDC?**
Logical replication for >50 writes/sec; triggers add too much per-row overhead at scale.

**Q: Can I replicate part of a table?**
Yes — column lists (PG15+) and row filters (PG15+) are first-class.

**Q: Does pgbouncer affect replication?**
No — replication uses its own protocol on a direct connection.

**Q: How do I bootstrap a new subscriber?**
`copy_data = true` does the initial sync; for huge tables consider pg_dump + start subscription with `copy_data = false`.

**Q: When should I use Debezium instead?**
Only if your sink is Kafka and you already run Kafka. Otherwise logical rep + Supabase ETL is simpler.

## Sources

- [PostgreSQL docs — Logical Replication](https://www.postgresql.org/docs/current/logical-replication.html)
- [Supabase ETL on GitHub](https://github.com/supabase/etl)
- [Definite — Best Postgres ETL tools 2026](https://www.definite.app/blog/best-etl-tools-for-postgresql-2026)
- [ParadeDB — Logical replication vs ETL](https://www.paradedb.com/blog/etl-vs-logical-replication)

---

Source: https://callsphere.ai/blog/vw7h-postgres-logical-replication-ai-etl-2026
