Skip to content
Building Event-Driven AI Agents: Architecture for Reactive Agent Systems
Learn Agentic AI11 min read16 views

Building Event-Driven AI Agents: Architecture for Reactive Agent Systems

Learn how to architect event-driven AI agents that react to real-time events using message buses, async handlers, and scalable processing patterns in Python with FastAPI.

Why Event-Driven Architecture for AI Agents

Traditional request-response AI agents wait for a user to ask a question. Event-driven AI agents flip this model entirely. They sit on a message bus, listening for events — a new file uploaded, a payment processed, a sensor reading out of range — and react autonomously without human initiation.

This architecture unlocks a category of agent behavior that is impossible with synchronous designs: agents that monitor, respond, and adapt to streams of real-world activity in real time. Production systems at companies like Stripe, GitHub, and Datadog all rely on event-driven patterns to power their automation layers.

In this guide, you will build a complete event-driven agent framework using FastAPI, an in-process event bus, and async handlers that scale horizontally.

Core Concepts

An event-driven agent system has four primary components:

flowchart LR
    IN(["Input text"])
    TOK["Tokenizer<br/>BPE or SentencePiece"]
    EMB["Token plus position<br/>embeddings"]
    subgraph BLOCK["Transformer block (xN)"]
        ATTN["Multi head<br/>self attention"]
        NORM1["Layer norm"]
        FF["Feed forward<br/>MLP"]
        NORM2["Layer norm"]
    end
    HEAD["LM head plus<br/>softmax"]
    SAMP["Sampling<br/>top-p, temperature"]
    OUT(["Next token"])
    IN --> TOK --> EMB --> ATTN --> NORM1 --> FF --> NORM2 --> HEAD --> SAMP --> OUT
    SAMP -.->|Append| EMB
    style BLOCK fill:#ede9fe,stroke:#7c3aed,color:#1e1b4b
    style ATTN fill:#4f46e5,stroke:#4338ca,color:#fff
    style OUT fill:#059669,stroke:#047857,color:#fff
  • Event producers — services or webhooks that emit structured events
  • Event bus — the routing layer that delivers events to interested handlers
  • Event handlers — functions that process specific event types
  • Agent logic — the AI reasoning layer that decides what action to take

The separation between the bus and the handlers is what makes the system scalable. You can add new event types and handlers without modifying existing code.

Hear it before you finish reading

Talk to a live CallSphere AI voice agent in your browser — 60 seconds, no signup.

Try Live Demo →

Building the Event Bus

Start with a lightweight in-process event bus. For production systems, you would swap this for Redis Streams, RabbitMQ, or Kafka, but the handler interface stays the same.

import asyncio
from typing import Callable, Any
from dataclasses import dataclass, field
from datetime import datetime
import uuid

@dataclass
class Event:
    event_type: str
    payload: dict[str, Any]
    event_id: str = field(default_factory=lambda: str(uuid.uuid4()))
    timestamp: str = field(default_factory=lambda: datetime.utcnow().isoformat())

class EventBus:
    def __init__(self):
        self._handlers: dict[str, list[Callable]] = {}
        self._queue: asyncio.Queue[Event] = asyncio.Queue()

    def subscribe(self, event_type: str, handler: Callable):
        if event_type not in self._handlers:
            self._handlers[event_type] = []
        self._handlers[event_type].append(handler)

    async def publish(self, event: Event):
        await self._queue.put(event)

    async def start_processing(self):
        while True:
            event = await self._queue.get()
            handlers = self._handlers.get(event.event_type, [])
            tasks = [handler(event) for handler in handlers]
            if tasks:
                await asyncio.gather(*tasks, return_exceptions=True)
            self._queue.task_done()

The EventBus class uses an asyncio queue internally. Producers call publish(), and the processing loop fans out each event to all subscribed handlers concurrently.

Registering Agent Handlers

Now wire up agent handlers that contain AI logic. Each handler subscribes to a specific event type and decides what to do based on the payload.

from openai import AsyncOpenAI

client = AsyncOpenAI()
bus = EventBus()

async def handle_support_ticket(event: Event):
    ticket = event.payload
    prompt = f"Classify this support ticket and suggest a response:\n{ticket['body']}"

    response = await client.chat.completions.create(
        model="gpt-4o",
        messages=[{"role": "user", "content": prompt}],
    )
    classification = response.choices[0].message.content
    print(f"Ticket {ticket['id']} classified: {classification}")

async def handle_deployment(event: Event):
    deploy = event.payload
    if deploy["status"] == "failed":
        prompt = f"Analyze this deployment failure and suggest fixes:\n{deploy['logs']}"
        response = await client.chat.completions.create(
            model="gpt-4o",
            messages=[{"role": "user", "content": prompt}],
        )
        print(f"Deployment fix suggestion: {response.choices[0].message.content}")

bus.subscribe("support.ticket.created", handle_support_ticket)
bus.subscribe("deployment.completed", handle_deployment)

Integrating with FastAPI

Expose the event bus through a FastAPI application so external services can push events via HTTP.

from fastapi import FastAPI
from contextlib import asynccontextmanager

@asynccontextmanager
async def lifespan(app: FastAPI):
    task = asyncio.create_task(bus.start_processing())
    yield
    task.cancel()

app = FastAPI(lifespan=lifespan)

@app.post("/events")
async def receive_event(event_type: str, payload: dict):
    event = Event(event_type=event_type, payload=payload)
    await bus.publish(event)
    return {"event_id": event.event_id, "status": "accepted"}

The lifespan context manager starts the event processing loop when the server boots and cancels it on shutdown. Events are accepted immediately and processed asynchronously, so the HTTP response returns fast regardless of how long the AI handler takes.

Still reading? Stop comparing — try CallSphere live.

CallSphere ships complete AI voice agents per industry — 14 tools for healthcare, 10 agents for real estate, 4 specialists for salons. See how it actually handles a call before you book a demo.

Scaling Considerations

For production workloads, replace the in-process queue with a distributed message broker. Redis Streams is a good starting point because it supports consumer groups, which let you run multiple agent workers processing events in parallel without duplicating work.

The handler interface remains identical — only the bus implementation changes. This is the key architectural advantage of event-driven design: your AI logic is decoupled from your delivery infrastructure.

FAQ

When should I use event-driven agents instead of a simple API?

Use event-driven agents when you need to react to things that happen outside your control — third-party webhooks, database changes, infrastructure alerts. If the agent only responds to direct user requests, a standard API is simpler and sufficient.

How do I prevent duplicate event processing?

Store processed event IDs in a database or Redis set. Before handling an event, check if its ID has already been processed. This idempotency check is critical when using at-least-once delivery brokers like Kafka or RabbitMQ.

What happens if an agent handler fails mid-processing?

With the asyncio-based bus shown above, exceptions are caught by return_exceptions=True in asyncio.gather. For production systems, implement a dead letter queue that captures failed events with their error context so you can replay them after fixing the handler.


#EventDrivenArchitecture #AIAgents #FastAPI #AsyncProcessing #MessageBus #AgenticAI #LearnAI #AIEngineering

Share

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.

Related Articles You May Like

AI Agents

Personal AI Assistant: How to Pick One for Business in 2026

A founder's guide to the personal AI assistant market: best AI assistant apps, business-grade options, and how CallSphere's voice agent fits in.

AI Agents

Free AI Agents in 2026: When Free Wins and When It Costs You

A founder's guide to free AI agents, low-code AI agent builders, and how to know when you should pay for a real platform like CallSphere.

Agentic AI

Graphiti: How Temporal Knowledge Graphs Give AI Voice Agents Persistent Memory (2026 Guide)

Graphiti is the open-source temporal knowledge graph for AI agents in 2026. Learn how bi-temporal memory beats vector RAG for voice agents and long-running LLMs.

AI Agents

Chatbot App vs ChatGPT: What's the Difference, and Which Do I Need?

Chatbot app vs ChatGPT in 2026: a founder's clear take on the difference, when to use which, and how a real AI chatbot app development works.

HVAC

Building an HVAC After-Hours Emergency Escalation System: A Complete Engineering Guide

How we built a fault-tolerant HVAC emergency triage and tech-dispatch platform on Kubernetes — three-tier CQRS, 11 micro-agents on the OpenAI Agents SDK + LangGraph, NATS JetStream, DTMF/SMS/WebSocket acceptance, circuit breakers, and an evaluation pipeline that catches regressions before they wake a tech at 3 AM.

Enterprise AI

OpenAI Frontier vs Anthropic Managed Agents: 2026 Comparison

Head-to-head: OpenAI Frontier and Anthropic's managed agent stack — strengths, fit, and what each means for enterprise AI voice and chat deployment.