By Sagar Shankaran, Founder of CallSphere
Skip Redis and Kafka for moderate-scale AI realtime — Postgres LISTEN/NOTIFY gives you pub/sub with ACID semantics. Build an agent event bus, watch for the global commit lock, and know when to graduate.
Key takeaways
TL;DR — Postgres LISTEN/NOTIFY is a transactional pub/sub primitive built into every Postgres install. It's perfect for AI agent realtime under ~5k commits/sec; above that, the global commit lock during NOTIFY becomes the bottleneck and you graduate to NATS or Redis Streams.
A Node.js + Postgres event bus that fires agent_event notifications when an agent finishes a turn, with a Server-Sent-Events bridge so browser clients see updates in <100ms — no message broker required.
CREATE TABLE agent_events (
id BIGSERIAL PRIMARY KEY,
tenant_id UUID NOT NULL,
agent_id UUID NOT NULL,
event_type TEXT NOT NULL,
payload JSONB NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE OR REPLACE FUNCTION notify_agent_event() RETURNS trigger AS $$
BEGIN
PERFORM pg_notify(
'agent_event_' || NEW.tenant_id::text,
json_build_object(
'id', NEW.id,
'agentId', NEW.agent_id,
'type', NEW.event_type,
'payload', NEW.payload
)::text
);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER trg_notify_agent_event
AFTER INSERT ON agent_events
FOR EACH ROW EXECUTE FUNCTION notify_agent_event();
flowchart LR
AGENT[Agent run] --> INS[INSERT agent_events]
INS --> TRG[AFTER INSERT trigger]
TRG --> NOTIFY[pg_notify channel]
NOTIFY --> LISTEN[Node listener]
LISTEN --> SSE[Server-Sent Events]
SSE --> BROWSER[React UI]
import { Client } from "pg";
export async function startListener(tenantId: string, onEvent: (e: any) => void) {
const client = new Client({ connectionString: process.env.DATABASE_URL });
await client.connect();
await client.query(`LISTEN agent_event_${tenantId.replace(/-/g, "_")}`);
client.on("notification", (n) => onEvent(JSON.parse(n.payload!)));
client.on("error", (e) => {
console.error("listener error", e);
setTimeout(() => startListener(tenantId, onEvent), 1000);
});
}
// app/api/agents/stream/route.ts
import { startListener } from "@/lib/notify";
export async function GET(req: Request) {
const tenantId = await getTenantFromRequest(req);
const stream = new ReadableStream({
start(ctrl) {
startListener(tenantId, (e) => {
ctrl.enqueue(`data: ${JSON.stringify(e)}\n\n`);
});
},
});
return new Response(stream, {
headers: { "Content-Type": "text/event-stream" },
});
}
await prisma.$executeRaw`
INSERT INTO agent_events (tenant_id, agent_id, event_type, payload)
VALUES (${tenantId}::uuid, ${agentId}::uuid, 'turn_complete',
${JSON.stringify({ tokens: 142, latency: 820 })}::jsonb)
`;
The trigger fires pg_notify automatically inside the same transaction. Atomic, durable, no separate broker.
Hear it before you finish reading
Talk to a live CallSphere AI voice agent in your browser — 60 seconds, no signup.
useEffect(() => {
const es = new EventSource("/api/agents/stream");
es.onmessage = (m) => {
const ev = JSON.parse(m.data);
setFeed((f) => [ev, ...f].slice(0, 100));
};
return () => es.close();
}, []);
SELECT query, state, wait_event_type, wait_event
FROM pg_stat_activity
WHERE wait_event = 'Notify' OR wait_event_type = 'LWLock';
If you see sustained NotifyQueueLock waits, you're saturating LISTEN/NOTIFY. Threshold: ~5k notifies/sec on commodity hardware (Recall.ai documented this).
const useBroker = (process.env.NOTIFY_RATE_PER_SEC ?? 0) > 4000;
if (useBroker) {
await nats.publish(`agent.${tenantId}`, payload);
} else {
await prisma.$executeRaw`SELECT pg_notify(...)`;
}
Build the abstraction now so a future migration to NATS/Redis Streams is a config flip.
NotifyQueueLock. Above ~5k commits/sec it serializes writes (Recall.ai outage post-mortem).CallSphere's call-events bus runs on LISTEN/NOTIFY for the 115+ DB tables' realtime needs — agent turns, tool calls, escalations all flow through Postgres triggers. At ~600 events/sec across 6 verticals (Healthcare/Behavioral Health on healthcare_voice Prisma, OneRoof RLS, UrackIT Supabase + ChromaDB), we're well under the commit-lock threshold. 37 agents · 90+ tools. Plans: $149/$499/$1,499, 14-day trial, 22% affiliate.
Q: Is LISTEN/NOTIFY at-least-once or exactly-once? At-most-once across reconnects — durable inside one TCP session. Add an outbox table for guaranteed delivery.
Still reading? Stop comparing — try CallSphere live.
CallSphere ships complete AI voice agents per industry — 14 tools for healthcare, 10 agents for real estate, 4 specialists for salons. See how it actually handles a call before you book a demo.
Q: Does pgbouncer break LISTEN/NOTIFY? Yes in transaction-pooling mode. Use session pooling or a dedicated direct connection for listeners.
Q: Can multiple servers listen to the same channel? Yes — every listener gets the message (fanout).
Q: How big can payload be? 8000 bytes raw. Send a row ID and let consumers fetch.
Q: When should I switch to NATS / Redis Streams? When commit rate sustains >5k/sec, or you need durable replay/consumer groups.
Written by
Sagar Shankaran· Founder, CallSphere
Sagar Shankaran is the founder of CallSphere, where he builds production AI voice and chat agents deployed across healthcare, hospitality, real estate, and home services. He writes about agentic AI, LLM engineering, and shipping voice agents that handle real calls in production.
See how AI voice agents work for your industry. Live demo available -- no signup required.
A founder's guide to the personal AI assistant market: best AI assistant apps, business-grade options, and how CallSphere's voice agent fits in.
A founder's guide to free AI agents, low-code AI agent builders, and how to know when you should pay for a real platform like CallSphere.
Graphiti is the open-source temporal knowledge graph for AI agents in 2026. Learn how bi-temporal memory beats vector RAG for voice agents and long-running LLMs.
Chatbot app vs ChatGPT in 2026: a founder's clear take on the difference, when to use which, and how a real AI chatbot app development works.
How we built a fault-tolerant HVAC emergency triage and tech-dispatch platform on Kubernetes — three-tier CQRS, 11 micro-agents on the OpenAI Agents SDK + LangGraph, NATS JetStream, DTMF/SMS/WebSocket acceptance, circuit breakers, and an evaluation pipeline that catches regressions before they wake a tech at 3 AM.
Head-to-head: OpenAI Frontier and Anthropic's managed agent stack — strengths, fit, and what each means for enterprise AI voice and chat deployment.
© 2026 CallSphere LLC. All rights reserved.
Watch how CallSphere handles real customer calls, schedules appointments, and processes payments — live.
Try Live DemoBook a DemoCalculate Your ROI