Skip to content
AI Infrastructure
AI Infrastructure11 min read0 views

Debezium 2.x for AI: Postgres Logical Replication as a Real-Time Customer Signal Bus

Debezium 2.x tails the Postgres WAL and emits a change event for every insert, update, and delete. Wire it to your AI agents and your CRM updates become instant agent context — no polling, no lag.

TL;DR — Debezium 2.x reads the Postgres WAL and produces a Kafka event per row change, with an at-most-once-then-Kafka pipeline. For AI, this means CRM updates and lead status changes flow into your agents in 50-100 ms — no polling, no missed signals. Confluent's legacy Debezium connector EOL'd March 31 2026; migrate to V2.

The pattern

Your AI sales agent needs to know that a lead just became a customer. Naive option: poll the DB every 30 s. Better option: Debezium tails the Postgres WAL, emits a CDC event to Kafka, and your agent consumer wakes up within ~100 ms of the row change. The DB is the source of truth; the bus is the signal layer.

How it works (architecture)

flowchart LR
  App[Apps] -->|UPDATE leads| PG[(Postgres<br/>logical replication)]
  PG -.WAL slot.- DBZ[Debezium 2.x connector]
  DBZ -->|change event| K[(Kafka topic<br/>cdc.public.leads)]
  K --> Agent[AI sales agent]
  K --> Vec[Vector store updater]
  K --> Cache[Cache invalidator]

Each row change becomes a Kafka message with before, after, op (c/u/d), and source metadata. The connector commits the LSN it has shipped, so a restart resumes from the last shipped point.

Hear it before you finish reading

Talk to a live CallSphere AI voice agent in your browser — 60 seconds, no signup.

Try Live Demo →

CallSphere implementation

CallSphere uses Debezium for two flows: (1) draining the outbox table (post #7) into NATS for the Real Estate OneRoof booking workflow, and (2) keeping our agent context cache fresh — when a CSR updates a lead in our admin UI, the AI sales agent sees the change before the next turn. After-hours uses Bull/Redis for delayed callbacks (different access pattern). 37 agents · 90+ tools · 115+ DB tables · 6 verticals · pricing $149/$499/$1499 · 14-day trial · 22% affiliate. Browse /pricing or schedule a demo.

Build steps with code

  1. Postgres: wal_level=logical, max_replication_slots=10, max_wal_senders=10.
  2. Create publication: CREATE PUBLICATION dbz FOR TABLE leads, bookings, outbox;.
  3. Deploy Kafka Connect with the Debezium 2.x Postgres connector.
  4. Use pgoutput plugin (built-in to Postgres 10+).
  5. Filter columns with column.exclude.list for PII reduction.
  6. Wrap as CloudEvents (post #12) before downstream consumers.
  7. Monitor replication slot lag — if it grows, your DB will fill its disk.
{
  "name": "leads-cdc",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "pg-primary",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "***",
    "database.dbname": "callsphere",
    "topic.prefix": "cdc",
    "plugin.name": "pgoutput",
    "publication.name": "dbz",
    "publication.autocreate.mode": "filtered",
    "table.include.list": "public.leads,public.bookings,public.outbox",
    "snapshot.mode": "initial",
    "tombstones.on.delete": "false"
  }
}
# Consumer side
from confluent_kafka import Consumer
c = Consumer({"bootstrap.servers": "kafka:9092", "group.id": "ai-context",
              "auto.offset.reset": "earliest"})
c.subscribe(["cdc.public.leads"])
while True:
    msg = c.poll(1.0)
    if not msg or msg.error(): continue
    payload = json.loads(msg.value())
    op = payload["op"]   # c, u, d, r (read = snapshot)
    after = payload.get("after")
    if op in ("c", "u"):
        agent_context.upsert_lead(after["id"], after)
    elif op == "d":
        agent_context.delete_lead(payload["before"]["id"])
    c.commit(msg)

Common pitfalls

  • Replication slot grows unbounded — if no consumer drains, WAL accumulates and the DB disk fills. Monitor pg_replication_slots.confirmed_flush_lsn.
  • Snapshot.mode=never on first run — you miss everything before connector start; use initial for backfill.
  • Capturing the entire DB — filter aggressively with table/column lists.
  • Forgetting tombstones — deletes need handling on the consumer side.
  • Using legacy Debezium connector on Confluent Cloud — EOL'd March 31, 2026; migrate to V2.

FAQ

Why not pg_notify? It doesn't survive restarts and has no replay. Debezium does.

Latency? 50-100 ms p95 from commit to consumer in production.

Still reading? Stop comparing — try CallSphere live.

CallSphere ships complete AI voice agents per industry — 14 tools for healthcare, 10 agents for real estate, 4 specialists for salons. See how it actually handles a call before you book a demo.

Can I CDC without Kafka? Yes — Debezium Engine runs embedded in a Java/Python process and emits to anywhere.

Where does this show up in CallSphere? Internal infra; book a demo to see the agent context flow.

Does it work with Aurora/RDS? Yes — both support logical replication.

Sources

Share

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.