Skip to content
Learn Agentic AI
Learn Agentic AI12 min read2 views

OpenAI Agents SDK with FastAPI: Production Web Server Integration Patterns

Learn how to mount OpenAI Agents SDK agents inside a FastAPI web server with session management, concurrent user handling, streaming responses, and production-ready error handling.

Why FastAPI and Agents SDK Work Well Together

FastAPI is async-native. The OpenAI Agents SDK is async-native. This alignment means you can run agent loops inside request handlers without blocking other users. No thread pools, no workarounds — just native async/await throughout the stack.

This guide shows you how to build a production web API that exposes agent capabilities to multiple concurrent users with proper session isolation.

Basic Integration: Agent as an Endpoint

The simplest pattern wraps a Runner.run call inside a FastAPI route.

flowchart TD
    START["OpenAI Agents SDK with FastAPI: Production Web Se…"] --> A
    A["Why FastAPI and Agents SDK Work Well To…"]
    A --> B
    B["Basic Integration: Agent as an Endpoint"]
    B --> C
    C["Session Management: Multi-Turn Conversa…"]
    C --> D
    D["Multi-Turn Endpoint with History"]
    D --> E
    E["Streaming Responses with Server-Sent Ev…"]
    E --> F
    F["Handling Concurrent Users"]
    F --> G
    G["Startup and Shutdown Lifecycle"]
    G --> H
    H["FAQ"]
    H --> DONE["Key Takeaways"]
    style START fill:#4f46e5,stroke:#4338ca,color:#fff
    style DONE fill:#059669,stroke:#047857,color:#fff
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from agents import Agent, Runner

app = FastAPI(title="Agent API")

support_agent = Agent(
    name="support",
    instructions="You are a customer support agent for a SaaS product.",
)


class ChatRequest(BaseModel):
    message: str
    user_id: str


class ChatResponse(BaseModel):
    reply: str
    agent_name: str


@app.post("/chat", response_model=ChatResponse)
async def chat(request: ChatRequest):
    try:
        result = await Runner.run(
            support_agent,
            input=request.message,
        )
        return ChatResponse(
            reply=result.final_output,
            agent_name=result.last_agent.name,
        )
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

Session Management: Multi-Turn Conversations

Real conversations span multiple requests. You need to persist the conversation state between calls. Here is a session manager that stores history per user.

from datetime import datetime, timedelta
from typing import Any
import uuid


class SessionManager:
    def __init__(self, ttl_minutes: int = 60):
        self._sessions: dict[str, dict[str, Any]] = {}
        self.ttl = timedelta(minutes=ttl_minutes)

    def get_or_create(self, session_id: str) -> dict[str, Any]:
        if session_id not in self._sessions:
            self._sessions[session_id] = {
                "id": session_id,
                "history": [],
                "created_at": datetime.utcnow(),
                "last_active": datetime.utcnow(),
            }
        session = self._sessions[session_id]
        session["last_active"] = datetime.utcnow()
        return session

    def cleanup_expired(self):
        now = datetime.utcnow()
        expired = [
            sid for sid, s in self._sessions.items()
            if now - s["last_active"] > self.ttl
        ]
        for sid in expired:
            del self._sessions[sid]


sessions = SessionManager(ttl_minutes=30)

Multi-Turn Endpoint with History

Now wire the session manager into your endpoint so each request carries forward the conversation.

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

from agents.items import TResponseInputItem

class MultiTurnRequest(BaseModel):
    message: str
    session_id: str | None = None


class MultiTurnResponse(BaseModel):
    reply: str
    session_id: str
    turn_count: int


@app.post("/chat/session", response_model=MultiTurnResponse)
async def chat_session(request: MultiTurnRequest):
    session_id = request.session_id or str(uuid.uuid4())
    session = sessions.get_or_create(session_id)

    # Build input from history plus new message
    input_items: list[TResponseInputItem] = list(session["history"])
    input_items.append({"role": "user", "content": request.message})

    result = await Runner.run(support_agent, input=input_items)

    # Persist the new turn in session history
    session["history"] = result.to_input_list()

    return MultiTurnResponse(
        reply=result.final_output,
        session_id=session_id,
        turn_count=len([
            item for item in session["history"]
            if isinstance(item, dict) and item.get("role") == "user"
        ]),
    )

Streaming Responses with Server-Sent Events

For long agent responses, streaming gives users immediate feedback.

from fastapi.responses import StreamingResponse
from agents import Runner

@app.post("/chat/stream")
async def chat_stream(request: ChatRequest):
    async def event_generator():
        result = Runner.run_streamed(support_agent, input=request.message)

        async for event in result.stream_events():
            if hasattr(event, "data"):
                yield f"data: {event.data}\n\n"

        yield f"data: [DONE]\n\n"

    return StreamingResponse(
        event_generator(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
        },
    )

Handling Concurrent Users

FastAPI handles concurrency naturally with async, but you need to ensure agent state is isolated per request. Never share mutable agent state across requests.

from contextlib import asynccontextmanager
import asyncio

# Rate limiting per user
user_semaphores: dict[str, asyncio.Semaphore] = {}

def get_user_semaphore(user_id: str, max_concurrent: int = 3) -> asyncio.Semaphore:
    if user_id not in user_semaphores:
        user_semaphores[user_id] = asyncio.Semaphore(max_concurrent)
    return user_semaphores[user_id]

@app.post("/chat/limited")
async def chat_with_limit(request: ChatRequest):
    semaphore = get_user_semaphore(request.user_id)

    if not semaphore._value:
        raise HTTPException(
            status_code=429,
            detail="Too many concurrent requests. Please wait.",
        )

    async with semaphore:
        result = await Runner.run(support_agent, input=request.message)
        return {"reply": result.final_output}

Startup and Shutdown Lifecycle

Use FastAPI's lifespan events to manage resources.

from contextlib import asynccontextmanager

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Startup: validate agent configuration
    print("Agent API starting, validating agents...")
    test_result = await Runner.run(support_agent, input="ping")
    print(f"Agent validated: {test_result.last_agent.name}")
    yield
    # Shutdown: cleanup
    sessions.cleanup_expired()
    print("Agent API shutdown complete")

app = FastAPI(title="Agent API", lifespan=lifespan)

FAQ

How do I handle agent timeouts in a web server context?

Wrap your Runner.run call with asyncio.wait_for(Runner.run(...), timeout=30.0). This raises asyncio.TimeoutError after 30 seconds, which you catch and return as a 504 Gateway Timeout. Set the timeout based on your load balancer and client expectations.

Should I create a new Agent instance per request?

No. Agent instances are lightweight configuration objects — they hold instructions, tool definitions, and handoff lists. They do not store conversation state. Create agents once at module level and reuse them across requests. The Runner manages per-request state internally.

How do I scale this beyond a single server?

Move session storage from in-memory dictionaries to Redis. Use Redis as your session backend so any server instance can resume any conversation. Deploy multiple FastAPI instances behind a load balancer. The agents are stateless, so horizontal scaling is straightforward.


#OpenAIAgentsSDK #FastAPI #Production #WebServer #Python #SessionManagement #AgenticAI #LearnAI #AIEngineering

Share
C

Written by

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.

Related Articles You May Like

Technical Guides

Twilio + AI Voice Agent Setup Guide: End-to-End Production Architecture

Complete setup guide for connecting Twilio to an AI voice agent — SIP trunking, webhooks, streaming, and production hardening.

Technical Guides

Building Multi-Agent Voice Systems with the OpenAI Agents SDK

A developer guide to building multi-agent voice systems with the OpenAI Agents SDK — triage, handoffs, shared state, and tool calling.

AI Interview Prep

7 AI Coding Interview Questions From Anthropic, Meta & OpenAI (2026 Edition)

Real AI coding interview questions from Anthropic, Meta, and OpenAI in 2026. Includes implementing attention from scratch, Anthropic's progressive coding screens, Meta's AI-assisted round, and vector search — with solution approaches.

Learn Agentic AI

Open Source AI Agent Frameworks Rising: Comparing 2026's Best Open Alternatives

Survey of open-source agent frameworks in 2026: LangGraph, CrewAI, AutoGen, Semantic Kernel, Haystack, and DSPy with community metrics, features, and production readiness.

Learn Agentic AI

AI Agent Framework Comparison 2026: LangGraph vs CrewAI vs AutoGen vs OpenAI Agents SDK

Side-by-side comparison of the top 4 AI agent frameworks: LangGraph, CrewAI, AutoGen, and OpenAI Agents SDK — architecture, features, production readiness, and when to choose each.

Learn Agentic AI

Building Resilient AI Agents: Circuit Breakers, Retries, and Graceful Degradation

Production resilience patterns for AI agents: circuit breakers for LLM APIs, exponential backoff with jitter, fallback models, and graceful degradation strategies.