Skip to content
Learn Agentic AI
Learn Agentic AI11 min read3 views

Rate Limiting AI Agents: Preventing Abuse and Controlling API Costs

Implement per-user rate limiting, token budgets, sliding window algorithms, and graceful degradation strategies to protect your AI agent system from abuse while controlling LLM API costs.

Why AI Agents Need Rate Limiting

A single LLM API call can cost anywhere from $0.01 to $0.50 depending on the model and token count. An AI agent that makes multiple LLM calls per user request — reasoning, tool execution, re-planning — can burn through $2-5 per interaction. Without rate limiting, a single abusive user or a runaway loop can generate thousands of dollars in API costs within minutes.

Rate limiting for AI agents goes beyond traditional API rate limiting because you need to track not just request counts but also token consumption, tool call frequency, and per-user spending budgets. This post builds a comprehensive rate limiting system using Redis and Python.

The Sliding Window Algorithm

Sliding window rate limiting is more fair than fixed windows because it prevents burst traffic at window boundaries:

flowchart TD
    START["Rate Limiting AI Agents: Preventing Abuse and Con…"] --> A
    A["Why AI Agents Need Rate Limiting"]
    A --> B
    B["The Sliding Window Algorithm"]
    B --> C
    C["Token Budget Tracking"]
    C --> D
    D["Graceful Degradation"]
    D --> E
    E["FastAPI Integration"]
    E --> F
    F["FAQ"]
    F --> DONE["Key Takeaways"]
    style START fill:#4f46e5,stroke:#4338ca,color:#fff
    style DONE fill:#059669,stroke:#047857,color:#fff
import time
import redis
from dataclasses import dataclass
from typing import Optional

@dataclass
class RateLimitConfig:
    requests_per_minute: int = 20
    requests_per_hour: int = 200
    tokens_per_minute: int = 50_000
    tokens_per_hour: int = 500_000
    max_daily_cost_usd: float = 10.0

@dataclass
class RateLimitResult:
    allowed: bool
    retry_after_seconds: Optional[int] = None
    remaining_requests: int = 0
    remaining_tokens: int = 0
    reason: Optional[str] = None

class SlidingWindowRateLimiter:
    def __init__(self, redis_client: redis.Redis, config: RateLimitConfig):
        self.redis = redis_client
        self.config = config

    def check_request(self, user_id: str) -> RateLimitResult:
        """Check if a request is allowed under all rate limits."""
        now = time.time()

        # Check requests per minute
        minute_key = f"rl:{user_id}:rpm"
        minute_count = self._count_in_window(minute_key, now, 60)
        if minute_count >= self.config.requests_per_minute:
            return RateLimitResult(
                allowed=False,
                retry_after_seconds=self._time_until_slot(minute_key, now, 60),
                reason="Per-minute request limit exceeded",
            )

        # Check requests per hour
        hour_key = f"rl:{user_id}:rph"
        hour_count = self._count_in_window(hour_key, now, 3600)
        if hour_count >= self.config.requests_per_hour:
            return RateLimitResult(
                allowed=False,
                retry_after_seconds=self._time_until_slot(hour_key, now, 3600),
                reason="Per-hour request limit exceeded",
            )

        # Record this request
        pipe = self.redis.pipeline()
        pipe.zadd(minute_key, {f"{now}": now})
        pipe.expire(minute_key, 120)
        pipe.zadd(hour_key, {f"{now}": now})
        pipe.expire(hour_key, 7200)
        pipe.execute()

        return RateLimitResult(
            allowed=True,
            remaining_requests=self.config.requests_per_minute - minute_count - 1,
            remaining_tokens=self.config.tokens_per_minute,
        )

    def _count_in_window(self, key: str, now: float, window_seconds: int) -> int:
        window_start = now - window_seconds
        return self.redis.zcount(key, window_start, now)

    def _time_until_slot(self, key: str, now: float, window_seconds: int) -> int:
        window_start = now - window_seconds
        oldest = self.redis.zrangebyscore(key, window_start, now, start=0, num=1)
        if oldest:
            oldest_time = float(oldest[0])
            return max(1, int(oldest_time + window_seconds - now) + 1)
        return 1

Token Budget Tracking

Token budgets prevent users from consuming excessive LLM tokens even within request limits:

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

class TokenBudgetTracker:
    """Track token consumption per user with rolling budgets."""

    # Approximate costs per 1K tokens (input + output blended)
    MODEL_COSTS = {
        "gpt-4o": 0.0075,
        "gpt-4o-mini": 0.0003,
        "claude-sonnet-4-20250514": 0.009,
    }

    def __init__(self, redis_client: redis.Redis, config: RateLimitConfig):
        self.redis = redis_client
        self.config = config

    def check_token_budget(self, user_id: str) -> RateLimitResult:
        """Check if user has remaining token budget."""
        now = time.time()

        minute_key = f"tokens:{user_id}:tpm"
        minute_tokens = self._sum_in_window(minute_key, now, 60)

        if minute_tokens >= self.config.tokens_per_minute:
            return RateLimitResult(
                allowed=False,
                reason="Per-minute token budget exhausted",
                remaining_tokens=0,
            )

        return RateLimitResult(
            allowed=True,
            remaining_tokens=self.config.tokens_per_minute - int(minute_tokens),
        )

    def record_usage(self, user_id: str, tokens_used: int, model: str) -> None:
        """Record token usage after a successful LLM call."""
        now = time.time()
        cost_per_1k = self.MODEL_COSTS.get(model, 0.01)
        cost = (tokens_used / 1000) * cost_per_1k

        pipe = self.redis.pipeline()

        # Track tokens
        minute_key = f"tokens:{user_id}:tpm"
        pipe.zadd(minute_key, {f"{now}:{tokens_used}": now})
        pipe.expire(minute_key, 120)

        # Track daily cost
        daily_key = f"cost:{user_id}:daily:{time.strftime('%Y-%m-%d')}"
        pipe.incrbyfloat(daily_key, cost)
        pipe.expire(daily_key, 86400 * 2)

        pipe.execute()

    def check_daily_budget(self, user_id: str) -> RateLimitResult:
        daily_key = f"cost:{user_id}:daily:{time.strftime('%Y-%m-%d')}"
        current_cost = float(self.redis.get(daily_key) or 0)

        if current_cost >= self.config.max_daily_cost_usd:
            return RateLimitResult(
                allowed=False,
                reason=f"Daily budget of ${self.config.max_daily_cost_usd} exhausted",
            )

        return RateLimitResult(allowed=True)

    def _sum_in_window(self, key: str, now: float, window_seconds: int) -> float:
        window_start = now - window_seconds
        entries = self.redis.zrangebyscore(key, window_start, now)
        return sum(int(e.decode().split(":")[1]) for e in entries)

Graceful Degradation

Instead of hard-blocking users when limits are reached, degrade service quality:

class GracefulDegradation:
    """Downgrade service instead of blocking when approaching limits."""

    def select_model(self, user_id: str, budget_tracker: TokenBudgetTracker) -> str:
        result = budget_tracker.check_daily_budget(user_id)
        daily_key = f"cost:{user_id}:daily:{time.strftime('%Y-%m-%d')}"
        current_cost = float(budget_tracker.redis.get(daily_key) or 0)
        max_cost = budget_tracker.config.max_daily_cost_usd

        usage_ratio = current_cost / max_cost if max_cost > 0 else 0

        if usage_ratio < 0.5:
            return "gpt-4o"        # Full quality
        elif usage_ratio < 0.8:
            return "gpt-4o-mini"   # Reduced quality
        else:
            return "gpt-4o-mini"   # Minimal with shorter context

    def get_max_tokens(self, usage_ratio: float) -> int:
        if usage_ratio < 0.5:
            return 4096
        elif usage_ratio < 0.8:
            return 2048
        else:
            return 1024

FastAPI Integration

from fastapi import FastAPI, Depends, HTTPException, Request
from fastapi.responses import JSONResponse

app = FastAPI()
redis_client = redis.Redis(host="localhost", port=6379, db=0)
config = RateLimitConfig()
limiter = SlidingWindowRateLimiter(redis_client, config)
budget = TokenBudgetTracker(redis_client, config)

async def rate_limit_dependency(request: Request):
    user_id = request.headers.get("X-User-ID", "anonymous")

    result = limiter.check_request(user_id)
    if not result.allowed:
        raise HTTPException(
            status_code=429,
            detail=result.reason,
            headers={"Retry-After": str(result.retry_after_seconds)},
        )

    budget_result = budget.check_daily_budget(user_id)
    if not budget_result.allowed:
        raise HTTPException(status_code=429, detail=budget_result.reason)

    return user_id

@app.post("/agent/chat")
async def chat(request: Request, user_id: str = Depends(rate_limit_dependency)):
    body = await request.json()
    # Process agent request with rate-limited user
    return {"response": "Agent response here"}

FAQ

How do I set appropriate rate limits for different user tiers?

Start by measuring actual usage patterns from real users for at least a week before setting limits. Create tiers based on your business model — free users might get 20 requests per hour and a $0.50 daily budget, while paid users get 200 requests per hour and a $20 daily budget. Store tier configurations in your database and load them dynamically rather than hardcoding values.

Should I rate limit per user, per API key, or per IP address?

Use a combination. Per-user limits are the primary control for authenticated users. Per-API-key limits protect against compromised keys. Per-IP limits catch unauthenticated abuse and credential stuffing. Apply the most restrictive matching limit. For agents specifically, also consider per-session limits to prevent a single long-running conversation from consuming too many resources.

How do I handle rate limiting in multi-agent workflows where one user request triggers many LLM calls?

Track the entire workflow as a single "request" for the user-facing rate limit, but track individual LLM calls for the token budget. This way, a user sees consistent request limits regardless of how many internal agent calls their request triggers, but the token budget still provides cost control. Pre-estimate the maximum tokens a workflow might consume and check the budget before starting the workflow, not after each call.


#RateLimiting #APICosts #AISafety #Redis #Python #AgenticAI #LearnAI #AIEngineering

Share
C

Written by

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.

Related Articles You May Like

AI Interview Prep

7 AI Coding Interview Questions From Anthropic, Meta & OpenAI (2026 Edition)

Real AI coding interview questions from Anthropic, Meta, and OpenAI in 2026. Includes implementing attention from scratch, Anthropic's progressive coding screens, Meta's AI-assisted round, and vector search — with solution approaches.

Learn Agentic AI

Agent Gateway Pattern: Rate Limiting, Authentication, and Request Routing for AI Agents

Implementing an agent gateway with API key management, per-agent rate limiting, intelligent request routing, audit logging, and cost tracking for enterprise AI systems.

Learn Agentic AI

AI Agent Safety Research 2026: Alignment, Sandboxing, and Constitutional AI for Agents

Current state of AI agent safety research covering alignment techniques, sandbox environments, constitutional AI applied to agents, and red-teaming methodologies.

AI Interview Prep

6 AI Safety & Alignment Interview Questions From Anthropic & OpenAI (2026)

Real AI safety and alignment interview questions from Anthropic and OpenAI in 2026. Covers alignment challenges, RLHF vs DPO, responsible scaling, red-teaming, safety-first decisions, and autonomous agent oversight.

Learn Agentic AI

Building a Multi-Agent Data Pipeline: Ingestion, Transformation, and Analysis Agents

Build a three-agent data pipeline with ingestion, transformation, and analysis agents that process data from APIs, CSVs, and databases using Python.

Learn Agentic AI

Building a Research Agent with Web Search and Report Generation: Complete Tutorial

Build a research agent that searches the web, extracts and synthesizes data, and generates formatted reports using OpenAI Agents SDK and web search tools.