Rate Limiting and Abuse Prevention for AI Agents
Learn how to implement token budgets, max_turns safety limits, IP and user-level throttling, and abuse detection for production AI agent systems using the OpenAI Agents SDK.
Why AI Agents Need Their Own Rate Limiting
Traditional API rate limiting counts requests. AI agent rate limiting must count something fundamentally different: tokens consumed, turns executed, tools invoked, and cost accumulated. A single agent request can trigger dozens of LLM calls, each consuming thousands of tokens. Without agent-aware rate limiting, one abusive user can burn through your entire monthly API budget in hours.
The threat model is also different. Traditional abuse means DDoS or credential stuffing. Agent abuse includes prompt injection loops that cause infinite tool calls, users who manipulate agents into generating massive outputs, and automated scripts that use your agent as a free LLM proxy.
Token Budget Architecture
The foundation of agent rate limiting is a token budget system. Every user gets a token allocation that decrements with each LLM call. The budget tracks input tokens, output tokens, and total cost separately because their pricing differs.
flowchart TD
START["Rate Limiting and Abuse Prevention for AI Agents"] --> A
A["Why AI Agents Need Their Own Rate Limit…"]
A --> B
B["Token Budget Architecture"]
B --> C
C["The max_turns Safety Net"]
C --> D
D["Tiered Rate Limiting by User Level"]
D --> E
E["Redis-Based Sliding Window Rate Limiter"]
E --> F
F["IP-Level Throttling for Unauthenticated…"]
F --> G
G["Abuse Detection Heuristics"]
G --> H
H["Putting It All Together: Middleware Int…"]
H --> DONE["Key Takeaways"]
style START fill:#4f46e5,stroke:#4338ca,color:#fff
style DONE fill:#059669,stroke:#047857,color:#fff
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Optional
import asyncio
@dataclass
class TokenBudget:
user_id: str
max_input_tokens: int = 500_000
max_output_tokens: int = 100_000
max_cost_usd: float = 5.00
window_hours: int = 24
used_input_tokens: int = 0
used_output_tokens: int = 0
used_cost_usd: float = 0.0
window_start: datetime = field(default_factory=datetime.utcnow)
def is_within_budget(self) -> bool:
if datetime.utcnow() - self.window_start > timedelta(hours=self.window_hours):
self._reset()
return (
self.used_input_tokens < self.max_input_tokens
and self.used_output_tokens < self.max_output_tokens
and self.used_cost_usd < self.max_cost_usd
)
def consume(self, input_tokens: int, output_tokens: int, cost_usd: float):
self.used_input_tokens += input_tokens
self.used_output_tokens += output_tokens
self.used_cost_usd += cost_usd
def remaining(self) -> dict:
return {
"input_tokens": self.max_input_tokens - self.used_input_tokens,
"output_tokens": self.max_output_tokens - self.used_output_tokens,
"cost_usd": round(self.max_cost_usd - self.used_cost_usd, 4),
"resets_at": (
self.window_start + timedelta(hours=self.window_hours)
).isoformat(),
}
def _reset(self):
self.used_input_tokens = 0
self.used_output_tokens = 0
self.used_cost_usd = 0.0
self.window_start = datetime.utcnow()
The max_turns Safety Net
The OpenAI Agents SDK's Runner.run() accepts a max_turns parameter that caps the number of agent turns in a single invocation. This is your most important safety valve. Without it, a malfunctioning agent can loop indefinitely — calling tools, processing results, and calling more tools until your budget is gone.
from agents import Agent, Runner, RunConfig
agent = Agent(
name="SafeAgent",
instructions="You are a helpful assistant. Complete tasks efficiently.",
model="gpt-4o",
)
async def safe_run(user_input: str) -> str:
try:
result = await Runner.run(
agent,
user_input,
max_turns=10,
run_config=RunConfig(
tracing_disabled=False,
),
)
return result.final_output
except Exception as e:
if "max_turns" in str(e).lower():
return "I could not complete this task within the allowed steps. Please simplify your request."
raise
Setting max_turns=10 means the agent gets at most 10 cycles of reasoning and tool use. For most tasks, 5-10 turns is sufficient. Complex multi-step workflows might need 15-25. Anything beyond 30 is almost certainly a loop.
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
Tiered Rate Limiting by User Level
Not all users should get the same limits. Free tier users get tight constraints. Paying customers get more room. Enterprise accounts get custom allocations. A tiered rate limiter manages this.
from enum import Enum
class UserTier(str, Enum):
FREE = "free"
PRO = "pro"
ENTERPRISE = "enterprise"
TIER_LIMITS = {
UserTier.FREE: {
"requests_per_minute": 5,
"requests_per_hour": 50,
"max_turns_per_request": 5,
"daily_token_budget": 100_000,
"daily_cost_budget_usd": 0.50,
},
UserTier.PRO: {
"requests_per_minute": 20,
"requests_per_hour": 500,
"max_turns_per_request": 15,
"daily_token_budget": 1_000_000,
"daily_cost_budget_usd": 10.00,
},
UserTier.ENTERPRISE: {
"requests_per_minute": 100,
"requests_per_hour": 5000,
"max_turns_per_request": 30,
"daily_token_budget": 10_000_000,
"daily_cost_budget_usd": 200.00,
},
}
Redis-Based Sliding Window Rate Limiter
A production rate limiter needs to work across multiple server instances. Redis is the standard choice — atomic operations, built-in TTLs, and sub-millisecond latency.
import redis.asyncio as redis
import time
import json
class AgentRateLimiter:
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
async def check_and_consume(
self, user_id: str, tier: UserTier
) -> tuple[bool, Optional[dict]]:
limits = TIER_LIMITS[tier]
now = time.time()
# Sliding window for requests per minute
minute_key = f"rate:rpm:${user_id}"
minute_count = await self._sliding_window_count(
minute_key, now, window_seconds=60
)
if minute_count >= limits["requests_per_minute"]:
return False, {
"error": "rate_limit_exceeded",
"limit": "requests_per_minute",
"retry_after_seconds": 60,
}
# Sliding window for requests per hour
hour_key = f"rate:rph:${user_id}"
hour_count = await self._sliding_window_count(
hour_key, now, window_seconds=3600
)
if hour_count >= limits["requests_per_hour"]:
return False, {
"error": "rate_limit_exceeded",
"limit": "requests_per_hour",
"retry_after_seconds": 3600,
}
# Record this request
pipe = self.redis.pipeline()
pipe.zadd(minute_key, {str(now): now})
pipe.expire(minute_key, 120)
pipe.zadd(hour_key, {str(now): now})
pipe.expire(hour_key, 7200)
await pipe.execute()
return True, None
async def _sliding_window_count(
self, key: str, now: float, window_seconds: int
) -> int:
cutoff = now - window_seconds
await self.redis.zremrangebyscore(key, 0, cutoff)
return await self.redis.zcard(key)
IP-Level Throttling for Unauthenticated Access
Before a user authenticates, their IP address is the only identifier. IP throttling prevents brute force attacks and resource exhaustion from automated scripts.
class IPThrottler:
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
self.max_per_minute = 10
self.max_per_hour = 100
self.ban_threshold = 500
self.ban_duration_hours = 24
async def check_ip(self, ip_address: str) -> tuple[bool, Optional[str]]:
ban_key = f"ban:ip:${ip_address}"
if await self.redis.exists(ban_key):
return False, "IP address is temporarily banned due to excessive requests."
hour_key = f"throttle:ip:hour:${ip_address}"
hour_count = await self.redis.incr(hour_key)
if hour_count == 1:
await self.redis.expire(hour_key, 3600)
if hour_count > self.ban_threshold:
await self.redis.setex(
ban_key, self.ban_duration_hours * 3600, "banned"
)
return False, "IP address banned for excessive requests."
if hour_count > self.max_per_hour:
return False, "Hourly request limit exceeded."
minute_key = f"throttle:ip:min:${ip_address}"
minute_count = await self.redis.incr(minute_key)
if minute_count == 1:
await self.redis.expire(minute_key, 60)
if minute_count > self.max_per_minute:
return False, "Per-minute request limit exceeded. Please slow down."
return True, None
Abuse Detection Heuristics
Rate limiting alone is not enough. Sophisticated abusers stay just under the rate limit while extracting maximum value. Behavioral heuristics detect these patterns.
class AbuseDetector:
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
async def analyze_request(
self, user_id: str, input_text: str, context: dict
) -> tuple[bool, Optional[str]]:
signals = []
# Signal: extremely long inputs (likely prompt injection or data exfiltration)
if len(input_text) > 10_000:
signals.append("oversized_input")
# Signal: repetitive requests (same hash within short window)
input_hash = hashlib.md5(input_text.encode()).hexdigest()
repeat_key = f"abuse:repeat:${user_id}:${input_hash}"
repeat_count = await self.redis.incr(repeat_key)
await self.redis.expire(repeat_key, 300)
if repeat_count > 3:
signals.append("repetitive_requests")
# Signal: rapid session creation (many unique session IDs)
session_key = f"abuse:sessions:${user_id}"
await self.redis.sadd(session_key, context.get("session_id", ""))
await self.redis.expire(session_key, 3600)
session_count = await self.redis.scard(session_key)
if session_count > 50:
signals.append("excessive_sessions")
# Signal: high tool invocation rate (proxy abuse)
tool_key = f"abuse:tools:${user_id}"
tool_count = await self.redis.incr(tool_key)
await self.redis.expire(tool_key, 3600)
if tool_count > 200:
signals.append("excessive_tool_calls")
if len(signals) >= 2:
await self._flag_user(user_id, signals)
return False, f"Suspicious activity detected: {', '.join(signals)}"
return True, None
async def _flag_user(self, user_id: str, signals: list[str]):
flag_key = f"abuse:flagged:${user_id}"
await self.redis.setex(flag_key, 86400, json.dumps(signals))
Putting It All Together: Middleware Integration
In a FastAPI application, all these components combine into middleware that runs before every agent invocation.
from fastapi import FastAPI, Request, HTTPException
app = FastAPI()
@app.middleware("http")
async def agent_rate_limit_middleware(request: Request, call_next):
ip = request.client.host
# IP throttling first (cheapest check)
ip_ok, ip_msg = await ip_throttler.check_ip(ip)
if not ip_ok:
raise HTTPException(status_code=429, detail=ip_msg)
# Authenticated rate limiting
user = getattr(request.state, "user", None)
if user:
allowed, info = await rate_limiter.check_and_consume(
user.id, user.tier
)
if not allowed:
raise HTTPException(status_code=429, detail=info)
response = await call_next(request)
return response
Key Takeaways
AI agent rate limiting is fundamentally different from API rate limiting. You must track tokens, turns, and cost — not just request counts. The max_turns parameter in Runner.run() is your single most important safety mechanism. Layer IP throttling, user-level rate limits, token budgets, and behavioral abuse detection to build defense in depth. Use Redis for distributed state so your limits work across all server instances. And always build tiered limits — treating free users and enterprise customers identically wastes resources and frustrates paying customers.
Written by
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.