---
title: "OpenAI Agents SDK with FastAPI: Production Web Server Integration Patterns"
description: "Learn how to mount OpenAI Agents SDK agents inside a FastAPI web server with session management, concurrent user handling, streaming responses, and production-ready error handling."
canonical: https://callsphere.ai/blog/openai-agents-sdk-fastapi-production-web-server-integration
category: "Learn Agentic AI"
tags: ["OpenAI Agents SDK", "FastAPI", "Production", "Web Server", "Python", "Session Management"]
author: "CallSphere Team"
published: 2026-03-17T00:00:00.000Z
updated: 2026-05-06T01:02:44.874Z
---

# OpenAI Agents SDK with FastAPI: Production Web Server Integration Patterns

> Learn how to mount OpenAI Agents SDK agents inside a FastAPI web server with session management, concurrent user handling, streaming responses, and production-ready error handling.

## Why FastAPI and Agents SDK Work Well Together

FastAPI is async-native. The OpenAI Agents SDK is async-native. This alignment means you can run agent loops inside request handlers without blocking other users. No thread pools, no workarounds — just native async/await throughout the stack.

This guide shows you how to build a production web API that exposes agent capabilities to multiple concurrent users with proper session isolation.

## Basic Integration: Agent as an Endpoint

The simplest pattern wraps a `Runner.run` call inside a FastAPI route.

```mermaid
flowchart LR
    INPUT(["User input"])
    AGENT["Agent
name plus instructions"]
    HAND{"Handoff to
another agent?"}
    SUB["Sub-agent
specialist"]
    GUARD{"Guardrail
passed?"}
    TOOL["Tool call"]
    SDK[("Tracing
OpenAI dashboard")]
    OUT(["Final output"])
    INPUT --> AGENT --> HAND
    HAND -->|Yes| SUB --> GUARD
    HAND -->|No| GUARD
    GUARD -->|Yes| TOOL --> AGENT
    GUARD -->|Block| OUT
    AGENT --> OUT
    AGENT --> SDK
    style AGENT fill:#4f46e5,stroke:#4338ca,color:#fff
    style GUARD fill:#f59e0b,stroke:#d97706,color:#1f2937
    style SDK fill:#ede9fe,stroke:#7c3aed,color:#1e1b4b
    style OUT fill:#059669,stroke:#047857,color:#fff
```

```python
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from agents import Agent, Runner

app = FastAPI(title="Agent API")

support_agent = Agent(
    name="support",
    instructions="You are a customer support agent for a SaaS product.",
)

class ChatRequest(BaseModel):
    message: str
    user_id: str

class ChatResponse(BaseModel):
    reply: str
    agent_name: str

@app.post("/chat", response_model=ChatResponse)
async def chat(request: ChatRequest):
    try:
        result = await Runner.run(
            support_agent,
            input=request.message,
        )
        return ChatResponse(
            reply=result.final_output,
            agent_name=result.last_agent.name,
        )
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))
```

## Session Management: Multi-Turn Conversations

Real conversations span multiple requests. You need to persist the conversation state between calls. Here is a session manager that stores history per user.

```python
from datetime import datetime, timedelta
from typing import Any
import uuid

class SessionManager:
    def __init__(self, ttl_minutes: int = 60):
        self._sessions: dict[str, dict[str, Any]] = {}
        self.ttl = timedelta(minutes=ttl_minutes)

    def get_or_create(self, session_id: str) -> dict[str, Any]:
        if session_id not in self._sessions:
            self._sessions[session_id] = {
                "id": session_id,
                "history": [],
                "created_at": datetime.utcnow(),
                "last_active": datetime.utcnow(),
            }
        session = self._sessions[session_id]
        session["last_active"] = datetime.utcnow()
        return session

    def cleanup_expired(self):
        now = datetime.utcnow()
        expired = [
            sid for sid, s in self._sessions.items()
            if now - s["last_active"] > self.ttl
        ]
        for sid in expired:
            del self._sessions[sid]

sessions = SessionManager(ttl_minutes=30)
```

## Multi-Turn Endpoint with History

Now wire the session manager into your endpoint so each request carries forward the conversation.

```python
from agents.items import TResponseInputItem

class MultiTurnRequest(BaseModel):
    message: str
    session_id: str | None = None

class MultiTurnResponse(BaseModel):
    reply: str
    session_id: str
    turn_count: int

@app.post("/chat/session", response_model=MultiTurnResponse)
async def chat_session(request: MultiTurnRequest):
    session_id = request.session_id or str(uuid.uuid4())
    session = sessions.get_or_create(session_id)

    # Build input from history plus new message
    input_items: list[TResponseInputItem] = list(session["history"])
    input_items.append({"role": "user", "content": request.message})

    result = await Runner.run(support_agent, input=input_items)

    # Persist the new turn in session history
    session["history"] = result.to_input_list()

    return MultiTurnResponse(
        reply=result.final_output,
        session_id=session_id,
        turn_count=len([
            item for item in session["history"]
            if isinstance(item, dict) and item.get("role") == "user"
        ]),
    )
```

## Streaming Responses with Server-Sent Events

For long agent responses, streaming gives users immediate feedback.

```python
from fastapi.responses import StreamingResponse
from agents import Runner

@app.post("/chat/stream")
async def chat_stream(request: ChatRequest):
    async def event_generator():
        result = Runner.run_streamed(support_agent, input=request.message)

        async for event in result.stream_events():
            if hasattr(event, "data"):
                yield f"data: {event.data}\n\n"

        yield f"data: [DONE]\n\n"

    return StreamingResponse(
        event_generator(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
        },
    )
```

## Handling Concurrent Users

FastAPI handles concurrency naturally with async, but you need to ensure agent state is isolated per request. Never share mutable agent state across requests.

```python
from contextlib import asynccontextmanager
import asyncio

# Rate limiting per user
user_semaphores: dict[str, asyncio.Semaphore] = {}

def get_user_semaphore(user_id: str, max_concurrent: int = 3) -> asyncio.Semaphore:
    if user_id not in user_semaphores:
        user_semaphores[user_id] = asyncio.Semaphore(max_concurrent)
    return user_semaphores[user_id]

@app.post("/chat/limited")
async def chat_with_limit(request: ChatRequest):
    semaphore = get_user_semaphore(request.user_id)

    if not semaphore._value:
        raise HTTPException(
            status_code=429,
            detail="Too many concurrent requests. Please wait.",
        )

    async with semaphore:
        result = await Runner.run(support_agent, input=request.message)
        return {"reply": result.final_output}
```

## Startup and Shutdown Lifecycle

Use FastAPI's lifespan events to manage resources.

```python
from contextlib import asynccontextmanager

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Startup: validate agent configuration
    print("Agent API starting, validating agents...")
    test_result = await Runner.run(support_agent, input="ping")
    print(f"Agent validated: {test_result.last_agent.name}")
    yield
    # Shutdown: cleanup
    sessions.cleanup_expired()
    print("Agent API shutdown complete")

app = FastAPI(title="Agent API", lifespan=lifespan)
```

## FAQ

### How do I handle agent timeouts in a web server context?

Wrap your `Runner.run` call with `asyncio.wait_for(Runner.run(...), timeout=30.0)`. This raises `asyncio.TimeoutError` after 30 seconds, which you catch and return as a 504 Gateway Timeout. Set the timeout based on your load balancer and client expectations.

### Should I create a new Agent instance per request?

No. Agent instances are lightweight configuration objects — they hold instructions, tool definitions, and handoff lists. They do not store conversation state. Create agents once at module level and reuse them across requests. The Runner manages per-request state internally.

### How do I scale this beyond a single server?

Move session storage from in-memory dictionaries to Redis. Use Redis as your session backend so any server instance can resume any conversation. Deploy multiple FastAPI instances behind a load balancer. The agents are stateless, so horizontal scaling is straightforward.

---

#OpenAIAgentsSDK #FastAPI #Production #WebServer #Python #SessionManagement #AgenticAI #LearnAI #AIEngineering

---

Source: https://callsphere.ai/blog/openai-agents-sdk-fastapi-production-web-server-integration
