Debezium 2.x for AI: Postgres Logical Replication as a Real-Time Customer Signal Bus
Debezium 2.x tails the Postgres WAL and emits a change event for every insert, update, and delete. Wire it to your AI agents and your CRM updates become instant agent context — no polling, no lag.
TL;DR — Debezium 2.x reads the Postgres WAL and produces a Kafka event per row change, with an at-most-once-then-Kafka pipeline. For AI, this means CRM updates and lead status changes flow into your agents in 50-100 ms — no polling, no missed signals. Confluent's legacy Debezium connector EOL'd March 31 2026; migrate to V2.
The pattern
Your AI sales agent needs to know that a lead just became a customer. Naive option: poll the DB every 30 s. Better option: Debezium tails the Postgres WAL, emits a CDC event to Kafka, and your agent consumer wakes up within ~100 ms of the row change. The DB is the source of truth; the bus is the signal layer.
How it works (architecture)
flowchart LR
App[Apps] -->|UPDATE leads| PG[(Postgres<br/>logical replication)]
PG -.WAL slot.- DBZ[Debezium 2.x connector]
DBZ -->|change event| K[(Kafka topic<br/>cdc.public.leads)]
K --> Agent[AI sales agent]
K --> Vec[Vector store updater]
K --> Cache[Cache invalidator]
Each row change becomes a Kafka message with before, after, op (c/u/d), and source metadata. The connector commits the LSN it has shipped, so a restart resumes from the last shipped point.
Hear it before you finish reading
Talk to a live CallSphere AI voice agent in your browser — 60 seconds, no signup.
CallSphere implementation
CallSphere uses Debezium for two flows: (1) draining the outbox table (post #7) into NATS for the Real Estate OneRoof booking workflow, and (2) keeping our agent context cache fresh — when a CSR updates a lead in our admin UI, the AI sales agent sees the change before the next turn. After-hours uses Bull/Redis for delayed callbacks (different access pattern). 37 agents · 90+ tools · 115+ DB tables · 6 verticals · pricing $149/$499/$1499 · 14-day trial · 22% affiliate. Browse /pricing or schedule a demo.
Build steps with code
- Postgres:
wal_level=logical,max_replication_slots=10,max_wal_senders=10. - Create publication:
CREATE PUBLICATION dbz FOR TABLE leads, bookings, outbox;. - Deploy Kafka Connect with the Debezium 2.x Postgres connector.
- Use
pgoutputplugin (built-in to Postgres 10+). - Filter columns with
column.exclude.listfor PII reduction. - Wrap as CloudEvents (post #12) before downstream consumers.
- Monitor replication slot lag — if it grows, your DB will fill its disk.
{
"name": "leads-cdc",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "pg-primary",
"database.port": "5432",
"database.user": "debezium",
"database.password": "***",
"database.dbname": "callsphere",
"topic.prefix": "cdc",
"plugin.name": "pgoutput",
"publication.name": "dbz",
"publication.autocreate.mode": "filtered",
"table.include.list": "public.leads,public.bookings,public.outbox",
"snapshot.mode": "initial",
"tombstones.on.delete": "false"
}
}
# Consumer side
from confluent_kafka import Consumer
c = Consumer({"bootstrap.servers": "kafka:9092", "group.id": "ai-context",
"auto.offset.reset": "earliest"})
c.subscribe(["cdc.public.leads"])
while True:
msg = c.poll(1.0)
if not msg or msg.error(): continue
payload = json.loads(msg.value())
op = payload["op"] # c, u, d, r (read = snapshot)
after = payload.get("after")
if op in ("c", "u"):
agent_context.upsert_lead(after["id"], after)
elif op == "d":
agent_context.delete_lead(payload["before"]["id"])
c.commit(msg)
Common pitfalls
- Replication slot grows unbounded — if no consumer drains, WAL accumulates and the DB disk fills. Monitor
pg_replication_slots.confirmed_flush_lsn. - Snapshot.mode=never on first run — you miss everything before connector start; use
initialfor backfill. - Capturing the entire DB — filter aggressively with table/column lists.
- Forgetting tombstones — deletes need handling on the consumer side.
- Using legacy Debezium connector on Confluent Cloud — EOL'd March 31, 2026; migrate to V2.
FAQ
Why not pg_notify? It doesn't survive restarts and has no replay. Debezium does.
Latency? 50-100 ms p95 from commit to consumer in production.
Still reading? Stop comparing — try CallSphere live.
CallSphere ships complete AI voice agents per industry — 14 tools for healthcare, 10 agents for real estate, 4 specialists for salons. See how it actually handles a call before you book a demo.
Can I CDC without Kafka? Yes — Debezium Engine runs embedded in a Java/Python process and emits to anywhere.
Where does this show up in CallSphere? Internal infra; book a demo to see the agent context flow.
Does it work with Aurora/RDS? Yes — both support logical replication.
Sources
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.