Deploying AI Agents to Production: Complete Infrastructure Guide
A comprehensive guide to deploying OpenAI Agents SDK applications to production using Docker, Kubernetes, environment variable management, health checks, autoscaling, and load balancing.
From Prototype to Production
Building an AI agent that works on your laptop is the easy part. Making it survive real traffic, stay up during model provider outages, scale under load, and remain debuggable when things go wrong — that is the engineering challenge. This guide walks through the full production deployment pipeline for an OpenAI Agents SDK application.
Project Structure
A production agent project needs clear separation between agent definitions, API layer, and infrastructure:
flowchart TD
START["Deploying AI Agents to Production: Complete Infra…"] --> A
A["From Prototype to Production"]
A --> B
B["Project Structure"]
B --> C
C["Configuration Management"]
C --> D
D["The FastAPI Application Layer"]
D --> E
E["API Routes with Concurrency Control"]
E --> F
F["Dockerfile"]
F --> G
G["Kubernetes Deployment"]
G --> H
H["Horizontal Pod Autoscaler"]
H --> DONE["Key Takeaways"]
style START fill:#4f46e5,stroke:#4338ca,color:#fff
style DONE fill:#059669,stroke:#047857,color:#fff
agent-service/
app/
agents/
__init__.py
triage.py
specialist.py
tools.py
api/
__init__.py
routes.py
middleware.py
dependencies.py
core/
config.py
logging.py
main.py
tests/
Dockerfile
docker-compose.yml
k8s/
deployment.yaml
service.yaml
hpa.yaml
configmap.yaml
secrets.yaml
pyproject.toml
Configuration Management
Never hardcode API keys, model names, or operational parameters. Use a configuration class that reads from environment variables:
# app/core/config.py
from pydantic_settings import BaseSettings
from functools import lru_cache
class Settings(BaseSettings):
# OpenAI
openai_api_key: str
openai_model: str = "gpt-4.1"
openai_timeout: float = 30.0
# Application
app_name: str = "agent-service"
app_env: str = "production"
log_level: str = "INFO"
max_concurrent_runs: int = 50
# Rate limiting
rate_limit_rpm: int = 100
rate_limit_burst: int = 20
# Health check
health_check_timeout: float = 5.0
class Config:
env_file = ".env"
env_file_encoding = "utf-8"
@lru_cache()
def get_settings() -> Settings:
return Settings()
The FastAPI Application Layer
Wrap your agents in a FastAPI application with proper request handling, timeouts, and error management:
# app/main.py
from fastapi import FastAPI, HTTPException, Depends
from contextlib import asynccontextmanager
import asyncio
import logging
from app.core.config import get_settings, Settings
from app.api.routes import router
logger = logging.getLogger(__name__)
@asynccontextmanager
async def lifespan(app: FastAPI):
settings = get_settings()
logger.info(f"Starting {settings.app_name} in {settings.app_env} mode")
# Initialize connection pools, caches, etc.
yield
# Cleanup on shutdown
logger.info("Shutting down agent service")
app = FastAPI(
title="Agent Service",
lifespan=lifespan,
)
app.include_router(router, prefix="/api/v1")
@app.get("/healthz")
async def health_check():
return {"status": "healthy"}
@app.get("/readyz")
async def readiness_check(settings: Settings = Depends(get_settings)):
# Verify we can reach OpenAI
try:
import httpx
async with httpx.AsyncClient(timeout=settings.health_check_timeout) as client:
resp = await client.get(
"https://api.openai.com/v1/models",
headers={"Authorization": f"Bearer {settings.openai_api_key}"},
)
if resp.status_code == 200:
return {"status": "ready"}
return {"status": "degraded", "reason": "OpenAI API returned non-200"}
except Exception as e:
raise HTTPException(status_code=503, detail=f"Not ready: {str(e)}")
API Routes with Concurrency Control
The routes layer handles request validation and enforces concurrency limits to prevent overwhelming the OpenAI API:
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
# app/api/routes.py
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
import asyncio
from agents import Runner
from app.agents.triage import triage_agent
from app.core.config import get_settings
router = APIRouter()
# Semaphore limits concurrent agent runs
_semaphore: asyncio.Semaphore | None = None
def get_semaphore() -> asyncio.Semaphore:
global _semaphore
if _semaphore is None:
settings = get_settings()
_semaphore = asyncio.Semaphore(settings.max_concurrent_runs)
return _semaphore
class AgentRequest(BaseModel):
message: str
session_id: str | None = None
metadata: dict | None = None
class AgentResponse(BaseModel):
response: str
session_id: str | None
tokens_used: int
@router.post("/run", response_model=AgentResponse)
async def run_agent(request: AgentRequest):
sem = get_semaphore()
settings = get_settings()
if not sem._value:
raise HTTPException(
status_code=429,
detail="Too many concurrent requests. Please retry.",
)
try:
async with asyncio.timeout(settings.openai_timeout):
async with sem:
result = await Runner.run(
triage_agent,
input=request.message,
)
total_tokens = sum(
r.usage.total_tokens
for r in result.raw_responses
if r.usage
)
return AgentResponse(
response=result.final_output,
session_id=request.session_id,
tokens_used=total_tokens,
)
except asyncio.TimeoutError:
raise HTTPException(
status_code=504,
detail="Agent run timed out.",
)
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Agent error: {type(e).__name__}",
)
Dockerfile
A production Dockerfile should use multi-stage builds, run as a non-root user, and minimize the image size:
# Build stage
FROM python:3.12-slim AS builder
WORKDIR /build
COPY pyproject.toml .
RUN pip install --no-cache-dir --prefix=/install .
# Production stage
FROM python:3.12-slim
# Security: run as non-root
RUN groupadd -r agent && useradd -r -g agent agent
WORKDIR /app
COPY --from=builder /install /usr/local
COPY app/ ./app/
# Set ownership
RUN chown -R agent:agent /app
USER agent
EXPOSE 8000
HEALTHCHECK --interval=30s --timeout=5s --retries=3 \
CMD python -c "import httpx; httpx.get('http://localhost:8000/healthz').raise_for_status()"
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "4"]
Kubernetes Deployment
The Kubernetes manifests handle scaling, secrets, and health checking:
# k8s/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: agent-service
labels:
app: agent-service
spec:
replicas: 3
selector:
matchLabels:
app: agent-service
template:
metadata:
labels:
app: agent-service
spec:
containers:
- name: agent-service
image: your-registry/agent-service:latest
ports:
- containerPort: 8000
envFrom:
- configMapRef:
name: agent-config
- secretRef:
name: agent-secrets
resources:
requests:
memory: "256Mi"
cpu: "250m"
limits:
memory: "512Mi"
cpu: "1000m"
livenessProbe:
httpGet:
path: /healthz
port: 8000
initialDelaySeconds: 10
periodSeconds: 30
readinessProbe:
httpGet:
path: /readyz
port: 8000
initialDelaySeconds: 5
periodSeconds: 10
startupProbe:
httpGet:
path: /healthz
port: 8000
failureThreshold: 30
periodSeconds: 2
Horizontal Pod Autoscaler
Scale based on CPU utilization or custom metrics:
# k8s/hpa.yaml
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: agent-service-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: agent-service
minReplicas: 2
maxReplicas: 20
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
behavior:
scaleUp:
stabilizationWindowSeconds: 60
policies:
- type: Pods
value: 4
periodSeconds: 60
scaleDown:
stabilizationWindowSeconds: 300
policies:
- type: Pods
value: 2
periodSeconds: 120
ConfigMap and Secrets
# k8s/configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: agent-config
data:
APP_ENV: "production"
LOG_LEVEL: "INFO"
OPENAI_MODEL: "gpt-4.1"
MAX_CONCURRENT_RUNS: "50"
RATE_LIMIT_RPM: "100"
---
# k8s/secrets.yaml
apiVersion: v1
kind: Secret
metadata:
name: agent-secrets
type: Opaque
stringData:
OPENAI_API_KEY: "sk-your-key-here"
Graceful Shutdown
Agents may be mid-execution when Kubernetes sends a SIGTERM. Handle graceful shutdown:
import signal
import asyncio
shutdown_event = asyncio.Event()
def handle_sigterm(*args):
shutdown_event.set()
signal.signal(signal.SIGTERM, handle_sigterm)
# In your route handler, check before starting new work:
@router.post("/run")
async def run_agent(request: AgentRequest):
if shutdown_event.is_set():
raise HTTPException(status_code=503, detail="Service shutting down")
# ... proceed with agent run
The full deployment pipeline — configuration management, containerization, health checks, autoscaling, and graceful shutdown — transforms a prototype agent into a production system. Start with a single replica and add scaling once you understand your traffic patterns. Monitor token usage and latency from day one, because cost surprises are the most common production agent issue.
Written by
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.