Skip to content
AI Infrastructure
AI Infrastructure12 min read0 views

Postgres Logical Replication for AI ETL: Stream OLTP to Your Feature Store (2026)

Stop running brittle nightly batches. Postgres logical replication gives you sub-second CDC into a warehouse, feature store, or vector index. A working publication, slot management, and a Supabase ETL Rust example.

TL;DR — Logical replication turns Postgres into a CDC source. AI teams in 2026 use it to stream OLTP changes into pgvector, Iceberg, or feature stores in <2 seconds — without nightly dumps and without touching production.

What you'll build

A publication on the OLTP primary that streams calls, messages, and agent_runs to a downstream Postgres + pgvector instance, where a transformer enriches each row with embeddings before insert. Lag stays under 2 seconds at 800 writes/sec.

Schema

-- On the primary (OLTP)
ALTER SYSTEM SET wal_level = logical;
SELECT pg_reload_conf();

CREATE PUBLICATION ai_pipeline FOR TABLE calls, messages, agent_runs
WITH (publish = 'insert,update');

-- On the subscriber (analytics + vectors)
CREATE SUBSCRIPTION ai_pipeline_sub
CONNECTION 'host=primary user=replicator dbname=app password=...'
PUBLICATION ai_pipeline
WITH (copy_data = true, streaming = on);

Architecture

flowchart LR
  APP[App writes] --> OLTP[(Primary Postgres)]
  OLTP --> WAL[WAL stream]
  WAL --> SLOT[Replication slot]
  SLOT --> SUB[Subscriber]
  SUB --> XFORM[Transformer<br/>+ embed]
  XFORM --> ANALYTICS[(Analytics + pgvector)]
  ANALYTICS --> AGENTS[AI agents]

Step 1 — Prepare the primary

# postgresql.conf
wal_level = logical
max_replication_slots = 20
max_wal_senders = 20

# pg_hba.conf
host replication replicator 10.0.0.0/8 scram-sha-256
CREATE ROLE replicator WITH REPLICATION LOGIN PASSWORD '...';
GRANT SELECT ON calls, messages, agent_runs TO replicator;

Step 2 — Create the publication

CREATE PUBLICATION ai_pipeline FOR TABLE calls, messages, agent_runs
WITH (publish = 'insert,update,delete');

For column-level filtering (Postgres 15+):

Hear it before you finish reading

Talk to a live CallSphere AI voice agent in your browser — 60 seconds, no signup.

Try Live Demo →
CREATE PUBLICATION ai_pipeline FOR TABLE
  calls (id, tenant_id, transcript, ended_at)
WITH (publish_via_partition_root = true);

Step 3 — Create the subscriber

CREATE SUBSCRIPTION ai_pipeline_sub
CONNECTION 'host=primary.internal user=replicator dbname=app password=...'
PUBLICATION ai_pipeline
WITH (copy_data = true, streaming = on, two_phase = on);

Step 4 — Transform and embed downstream

# Run on the subscriber, polling the new rows
import psycopg, openai, os

oai = openai.OpenAI()
sub = psycopg.connect(os.environ["SUB_DSN"])

def enrich():
    with sub.cursor() as cur:
        cur.execute("""
            SELECT id, transcript FROM calls
            WHERE embedding IS NULL AND transcript IS NOT NULL
            LIMIT 100
        """)
        rows = cur.fetchall()
    for cid, txt in rows:
        v = oai.embeddings.create(
            model="text-embedding-3-small", input=txt[:8000]
        ).data[0].embedding
        with sub.cursor() as cur:
            cur.execute(
                "UPDATE calls SET embedding = %s::vector WHERE id = %s",
                (v, cid),
            )
    sub.commit()

Run on a 30-second cron loop or trigger via LISTEN/NOTIFY for sub-second latency.

Step 5 — Monitor lag

-- On the primary
SELECT slot_name, active, restart_lsn,
       pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) AS lag_bytes
FROM pg_replication_slots;

-- On the subscriber
SELECT subname, received_lsn, latest_end_lsn,
       latest_end_time, last_msg_receipt_time
FROM pg_stat_subscription;

Step 6 — Use Supabase ETL for fan-out

For non-Postgres destinations (BigQuery, Iceberg, S3 Parquet) drop in Supabase ETL — Rust building blocks on top of pgoutput that handle backfill, restart, and parallel apply.

cargo install supabase-etl
supabase-etl run --source ai_pipeline --sink bigquery://prod

Pitfalls

  • Replication slots leak WAL — a paused subscriber keeps WAL on disk forever. Monitor pg_replication_slots.active and disk usage daily.
  • No DDL replication — schema changes must be applied to the subscriber first.
  • Large transactions stall — use streaming = on (Postgres 14+) to apply in chunks.
  • Forgetting REPLICA IDENTITY — UPDATE/DELETE on tables without primary keys silently breaks. Set REPLICA IDENTITY FULL for those.

CallSphere production note

CallSphere streams OLTP changes from call_logs and messages tables into a vector-enriched analytics replica via logical replication. 115+ DB tables stay write-fast on the primary; embeddings, summaries, and analytics aggregates are computed on the subscriber. Healthcare uses HIPAA-isolated healthcare_voice Prisma + dedicated subscriber; OneRoof preserves RLS via per-tenant publications; UrackIT mirrors to Supabase + ChromaDB. 37 agents · 90+ tools · 6 verticals. Plans: $149/$499/$1,499 — 14-day trial, 22% affiliate.

FAQ

Q: Logical replication or trigger-based CDC? Logical replication for >50 writes/sec; triggers add too much per-row overhead at scale.

Still reading? Stop comparing — try CallSphere live.

CallSphere ships complete AI voice agents per industry — 14 tools for healthcare, 10 agents for real estate, 4 specialists for salons. See how it actually handles a call before you book a demo.

Q: Can I replicate part of a table? Yes — column lists (PG15+) and row filters (PG15+) are first-class.

Q: Does pgbouncer affect replication? No — replication uses its own protocol on a direct connection.

Q: How do I bootstrap a new subscriber? copy_data = true does the initial sync; for huge tables consider pg_dump + start subscription with copy_data = false.

Q: When should I use Debezium instead? Only if your sink is Kafka and you already run Kafka. Otherwise logical rep + Supabase ETL is simpler.

Sources

Share

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.

Related Articles You May Like

AI Infrastructure

Database Backup and Recovery for AI Agent State: Postgres + pgvector

Your agent's memory, embeddings, and conversation state all live in Postgres. Backups must include vector data and survive a full-region loss. Here's how CallSphere does PITR for 115+ tables.

AI Infrastructure

pgvector 0.9: Hybrid Search and Binary Vectors Now Production-Ready

pgvector 0.9 brings hybrid search, binary vectors, and improved indexing primitives. Why Postgres-native vector is good enough for most teams in 2026 honestly.

AI Engineering

Self-Hosting Mem0 on Postgres + pgvector: A Production Recipe

Mem0 on a single Postgres + pgvector instance, end to end. Schema, indexes, and the queries that keep latency under 200ms even with millions of memory records.

AI Engineering

pgvector at Scale in 2026: HNSW Tuning + Binary Quantization

pgvector 0.8 with binary quantization cut HNSW build time 150x and hits 471 QPS at 99% recall on 50M vectors. Here is the production tuning guide for Postgres-shop teams.

AI Engineering

Postgres + DuckDB for AI Analytics: pg_duckdb Speeds Up OLAP 100x (2026)

pg_duckdb embeds DuckDB inside Postgres so transactional and analytic queries share the same database. AI dashboards that took 90 sec on Postgres run in <1 sec via DuckDB — without leaving Postgres.

AI Engineering

Schema Migrations with Prisma + Atlas: Safe Postgres Changes for AI Apps (2026)

Prisma owns the schema model, Atlas owns the migration plan, lint, and CI/CD. Together you get declarative schema, automatic diff plans, and zero-downtime production deploys for AI-heavy Postgres.