---
title: "Event-Driven Microservices for AI Agents: Kafka, RabbitMQ, and NATS Patterns"
description: "Implement event-driven communication between AI agent microservices using Kafka, RabbitMQ, and NATS. Learn event schema design, pub/sub patterns, event sourcing, and exactly-once delivery semantics."
canonical: https://callsphere.ai/blog/event-driven-microservices-ai-agents-kafka-rabbitmq-nats
category: "Learn Agentic AI"
tags: ["Event-Driven", "Kafka", "RabbitMQ", "NATS", "Microservices", "Agentic AI"]
author: "CallSphere Team"
published: 2026-03-17T00:00:00.000Z
updated: 2026-05-06T01:02:44.213Z
---

# Event-Driven Microservices for AI Agents: Kafka, RabbitMQ, and NATS Patterns

> Implement event-driven communication between AI agent microservices using Kafka, RabbitMQ, and NATS. Learn event schema design, pub/sub patterns, event sourcing, and exactly-once delivery semantics.

## Why Event-Driven Architecture Fits AI Agent Systems

AI agent workflows are inherently asynchronous. A user sends a message, the agent reasons over it, calls tools, retrieves context from a vector store, and eventually returns a response. Many of these steps can happen independently. The memory service needs to record the conversation after the response is sent. The analytics service needs to log latency metrics. The billing service needs to track token usage.

If all of these happen synchronously in the request path, response latency balloons. Event-driven architecture decouples the request path from downstream processing. The conversation service publishes events, and other services consume them independently.

## Designing Event Schemas

A well-designed event schema is the contract between services. It must be self-describing, versioned, and contain enough context for any consumer to act without making additional API calls:

```mermaid
flowchart LR
    INPUT(["User intent"])
    PARSE["Parse plus
classify"]
    PLAN["Plan and tool
selection"]
    AGENT["Agent loop
LLM plus tools"]
    GUARD{"Guardrails
and policy"}
    EXEC["Execute and
verify result"]
    OBS[("Trace and metrics")]
    OUT(["Outcome plus
next action"])
    INPUT --> PARSE --> PLAN --> AGENT --> GUARD
    GUARD -->|Pass| EXEC --> OUT
    GUARD -->|Fail| AGENT
    AGENT --> OBS
    style AGENT fill:#4f46e5,stroke:#4338ca,color:#fff
    style GUARD fill:#f59e0b,stroke:#d97706,color:#1f2937
    style OBS fill:#ede9fe,stroke:#7c3aed,color:#1e1b4b
    style OUT fill:#059669,stroke:#047857,color:#fff
```

```python
from dataclasses import dataclass, field, asdict
from datetime import datetime
import uuid
import json

@dataclass
class AgentEvent:
    event_id: str = field(default_factory=lambda: str(uuid.uuid4()))
    event_type: str = ""
    version: str = "1.0"
    timestamp: str = field(
        default_factory=lambda: datetime.utcnow().isoformat()
    )
    source_service: str = ""
    correlation_id: str = ""
    payload: dict = field(default_factory=dict)

    def to_json(self) -> str:
        return json.dumps(asdict(self))

# Example events published by the conversation service
def create_message_received_event(
    session_id: str, user_msg: str, correlation_id: str
) -> AgentEvent:
    return AgentEvent(
        event_type="agent.message.received",
        source_service="conversation-manager",
        correlation_id=correlation_id,
        payload={
            "session_id": session_id,
            "message": user_msg,
            "message_type": "user",
        },
    )

def create_response_generated_event(
    session_id: str,
    response: str,
    tokens_used: int,
    model: str,
    correlation_id: str,
) -> AgentEvent:
    return AgentEvent(
        event_type="agent.response.generated",
        source_service="conversation-manager",
        correlation_id=correlation_id,
        payload={
            "session_id": session_id,
            "response_length": len(response),
            "tokens_used": tokens_used,
            "model": model,
        },
    )
```

The `correlation_id` ties all events from a single user request together across services, which is essential for distributed tracing.

## Kafka for High-Throughput Agent Event Streams

Kafka excels when you need durable, ordered event streams at high throughput. Agent systems that process thousands of messages per minute benefit from Kafka's partitioned log architecture:

```python
from aiokafka import AIOKafkaProducer, AIOKafkaConsumer
import asyncio

# Producer in the conversation service
class AgentEventProducer:
    def __init__(self, bootstrap_servers: str = "kafka:9092"):
        self.producer = AIOKafkaProducer(
            bootstrap_servers=bootstrap_servers,
            value_serializer=lambda v: v.encode("utf-8"),
            acks="all",  # Wait for all replicas to acknowledge
        )

    async def start(self):
        await self.producer.start()

    async def publish(self, event: AgentEvent):
        topic = event.event_type.replace(".", "-")
        await self.producer.send_and_wait(
            topic=topic,
            value=event.to_json(),
            key=event.correlation_id.encode("utf-8"),
        )

# Consumer in the analytics service
class AnalyticsConsumer:
    def __init__(self):
        self.consumer = AIOKafkaConsumer(
            "agent-response-generated",
            bootstrap_servers="kafka:9092",
            group_id="analytics-service",
            auto_offset_reset="earliest",
            enable_auto_commit=False,
        )

    async def consume(self):
        await self.consumer.start()
        try:
            async for msg in self.consumer:
                event = json.loads(msg.value.decode("utf-8"))
                await self.process_event(event)
                await self.consumer.commit()
        finally:
            await self.consumer.stop()

    async def process_event(self, event: dict):
        payload = event["payload"]
        await self.db.insert_metric(
            session_id=payload["session_id"],
            tokens_used=payload["tokens_used"],
            model=payload["model"],
            timestamp=event["timestamp"],
        )
```

Setting `acks="all"` ensures the event is durably written before the producer considers it sent. The consumer uses manual commit (`enable_auto_commit=False`) to guarantee at-least-once processing.

## NATS for Lightweight Agent Communication

NATS is a strong choice for agent systems that need low-latency pub/sub without Kafka's operational complexity:

```python
import nats

async def nats_publisher():
    nc = await nats.connect("nats://nats:4222")
    event = create_message_received_event(
        session_id="sess-123",
        user_msg="What is my account balance?",
        correlation_id="req-abc",
    )
    await nc.publish(
        "agent.message.received",
        event.to_json().encode(),
    )
    await nc.flush()
    await nc.close()

async def nats_subscriber():
    nc = await nats.connect("nats://nats:4222")
    sub = await nc.subscribe("agent.>")  # Wildcard subscription

    async for msg in sub.messages:
        event = json.loads(msg.data.decode())
        print(f"Received {event['event_type']} "
              f"from {event['source_service']}")
```

NATS uses subject-based addressing with wildcards. The pattern `agent.>` subscribes to all events under the `agent` namespace, making it easy to build monitoring dashboards.

## Exactly-Once Semantics

True exactly-once delivery is achievable through idempotent consumers. Store the `event_id` in a processed-events table and check it before processing:

```python
async def process_event_exactly_once(self, event: dict):
    event_id = event["event_id"]
    if await self.db.event_already_processed(event_id):
        return  # Skip duplicate
    await self.handle(event)
    await self.db.mark_event_processed(event_id)
```

## FAQ

### When should I choose Kafka over NATS for an agent system?

Choose Kafka when you need durable event storage for replay, strict ordering within partitions, and high throughput at scale (thousands of events per second). Choose NATS when you need simple pub/sub with low latency, the event volume is moderate, and you want minimal operational overhead. For most agent systems under 500 requests per minute, NATS is simpler to operate.

### How do I handle schema evolution when event formats change?

Include a `version` field in every event. When the schema changes, increment the version. Consumers should handle multiple versions by checking the version field and applying the appropriate deserialization logic. Avoid breaking changes — add new fields rather than renaming or removing existing ones.

### Should every microservice publish events, or just the core conversation service?

Every service that performs a meaningful state change should publish events. The tool execution service should publish `tool.execution.completed` events. The RAG service should publish `rag.retrieval.completed` events. This gives downstream services full visibility into the agent's behavior without coupling them to the conversation service.

---

#EventDriven #Kafka #RabbitMQ #NATS #Microservices #AgenticAI #LearnAI #AIEngineering

---

Source: https://callsphere.ai/blog/event-driven-microservices-ai-agents-kafka-rabbitmq-nats
