Skip to content
Semaphores and Rate Limiting: Controlling Concurrent LLM API Requests
Learn Agentic AI13 min read34 views

Semaphores and Rate Limiting: Controlling Concurrent LLM API Requests

Master asyncio.Semaphore, token bucket, and sliding window rate limiters to control concurrent LLM API requests. Includes retry-after handling and adaptive throttling.

The Rate Limiting Problem in AI Systems

LLM APIs enforce strict rate limits — typically measured in requests per minute (RPM) and tokens per minute (TPM). An agent processing 100 documents concurrently will blow past these limits immediately, triggering 429 errors, wasted retries, and degraded throughput.

Effective rate limiting requires two mechanisms: concurrency control (how many requests are in-flight simultaneously) and rate control (how many requests per time window). asyncio provides the primitives to implement both.

asyncio.Semaphore: Basic Concurrency Control

A semaphore limits the number of coroutines that can execute a critical section simultaneously. It is the simplest and most effective tool for capping concurrent API calls.

flowchart LR
    INPUT(["User intent"])
    PARSE["Parse plus<br/>classify"]
    PLAN["Plan and tool<br/>selection"]
    AGENT["Agent loop<br/>LLM plus tools"]
    GUARD{"Guardrails<br/>and policy"}
    EXEC["Execute and<br/>verify result"]
    OBS[("Trace and metrics")]
    OUT(["Outcome plus<br/>next action"])
    INPUT --> PARSE --> PLAN --> AGENT --> GUARD
    GUARD -->|Pass| EXEC --> OUT
    GUARD -->|Fail| AGENT
    AGENT --> OBS
    style AGENT fill:#4f46e5,stroke:#4338ca,color:#fff
    style GUARD fill:#f59e0b,stroke:#d97706,color:#1f2937
    style OBS fill:#ede9fe,stroke:#7c3aed,color:#1e1b4b
    style OUT fill:#059669,stroke:#047857,color:#fff
import asyncio
import httpx
import time

async def call_llm(
    client: httpx.AsyncClient,
    semaphore: asyncio.Semaphore,
    prompt: str,
) -> str:
    """Make an LLM call with concurrency limiting."""
    async with semaphore:  # Blocks if limit reached
        print(f"[{time.monotonic():.1f}] Sending: {prompt[:30]}...")
        response = await client.post(
            "https://api.openai.com/v1/chat/completions",
            json={
                "model": "gpt-4o",
                "messages": [{"role": "user", "content": prompt}],
            },
        )
        response.raise_for_status()
        return response.json()["choices"][0]["message"]["content"]

async def process_batch(prompts: list[str], max_concurrent: int = 5):
    """Process prompts with bounded concurrency."""
    semaphore = asyncio.Semaphore(max_concurrent)

    async with httpx.AsyncClient(
        headers={"Authorization": f"Bearer {API_KEY}"},
        timeout=60.0,
    ) as client:
        tasks = [
            call_llm(client, semaphore, prompt)
            for prompt in prompts
        ]
        return await asyncio.gather(*tasks, return_exceptions=True)

With max_concurrent=5, only five API calls are in-flight at once. The remaining coroutines wait at async with semaphore until a slot opens.

Hear it before you finish reading

Talk to a live CallSphere AI voice agent in your browser — 60 seconds, no signup.

Try Live Demo →

Token Bucket Rate Limiter

A semaphore controls concurrency but not rate. For true rate limiting (e.g., 60 requests per minute), implement a token bucket algorithm.

class TokenBucketRateLimiter:
    """Token bucket algorithm for rate-limited API calls."""

    def __init__(self, rate: float, capacity: int):
        """
        Args:
            rate: Tokens added per second (e.g., 1.0 = 60/min)
            capacity: Maximum burst size
        """
        self.rate = rate
        self.capacity = capacity
        self.tokens = capacity
        self.last_refill = time.monotonic()
        self._lock = asyncio.Lock()

    async def acquire(self):
        """Wait until a token is available."""
        while True:
            async with self._lock:
                now = time.monotonic()
                elapsed = now - self.last_refill
                self.tokens = min(
                    self.capacity,
                    self.tokens + elapsed * self.rate,
                )
                self.last_refill = now

                if self.tokens >= 1.0:
                    self.tokens -= 1.0
                    return

            # No tokens available, wait for next refill
            await asyncio.sleep(1.0 / self.rate)

# Usage: 60 requests per minute with burst of 10
limiter = TokenBucketRateLimiter(rate=1.0, capacity=10)

async def rate_limited_call(client, prompt):
    await limiter.acquire()  # Wait for rate limit token
    return await call_llm_api(client, prompt)

The token bucket allows short bursts up to capacity, then throttles to the sustained rate. This matches how most LLM APIs behave — they allow brief spikes but enforce an average rate.

Sliding Window Rate Limiter

A sliding window provides more precise rate limiting by tracking exact request timestamps.

from collections import deque

class SlidingWindowLimiter:
    """Sliding window rate limiter for precise request counting."""

    def __init__(self, max_requests: int, window_seconds: float):
        self.max_requests = max_requests
        self.window = window_seconds
        self.timestamps: deque[float] = deque()
        self._lock = asyncio.Lock()

    async def acquire(self):
        """Wait until a request slot is available."""
        while True:
            async with self._lock:
                now = time.monotonic()
                # Remove expired timestamps
                while (self.timestamps and
                       self.timestamps[0] <= now - self.window):
                    self.timestamps.popleft()

                if len(self.timestamps) < self.max_requests:
                    self.timestamps.append(now)
                    return

                # Calculate wait time until oldest request expires
                wait = self.timestamps[0] + self.window - now

            await asyncio.sleep(wait)

# Usage: 100 requests per 60-second window
limiter = SlidingWindowLimiter(max_requests=100, window_seconds=60)

Combining Semaphore and Rate Limiter

Production systems need both concurrency control and rate limiting.

Still reading? Stop comparing — try CallSphere live.

CallSphere ships complete AI voice agents per industry — 14 tools for healthcare, 10 agents for real estate, 4 specialists for salons. See how it actually handles a call before you book a demo.

class LLMThrottler:
    """Combined concurrency + rate limiter for LLM APIs."""

    def __init__(
        self,
        max_concurrent: int = 10,
        max_per_minute: int = 60,
    ):
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.rate_limiter = SlidingWindowLimiter(
            max_requests=max_per_minute,
            window_seconds=60,
        )

    async def call(
        self,
        client: httpx.AsyncClient,
        prompt: str,
    ) -> str:
        # First: wait for rate limit slot
        await self.rate_limiter.acquire()
        # Then: wait for concurrency slot
        async with self.semaphore:
            response = await client.post(
                "https://api.openai.com/v1/chat/completions",
                json={
                    "model": "gpt-4o",
                    "messages": [
                        {"role": "user", "content": prompt}
                    ],
                },
            )
            response.raise_for_status()
            return response.json()["choices"][0]["message"]["content"]

# Usage
throttler = LLMThrottler(max_concurrent=10, max_per_minute=60)

async def process_batch(prompts: list[str]):
    async with httpx.AsyncClient(
        headers={"Authorization": f"Bearer {API_KEY}"},
        timeout=60.0,
    ) as client:
        return await asyncio.gather(
            *[throttler.call(client, p) for p in prompts]
        )

Handling Retry-After Headers

When you do hit a 429, respect the server's retry-after header.

async def call_with_retry_after(
    client: httpx.AsyncClient,
    throttler: LLMThrottler,
    prompt: str,
    max_retries: int = 3,
) -> str:
    for attempt in range(max_retries):
        try:
            return await throttler.call(client, prompt)
        except httpx.HTTPStatusError as e:
            if e.response.status_code == 429:
                retry_after = float(
                    e.response.headers.get("retry-after", "5")
                )
                print(f"429 received. Waiting {retry_after}s")
                await asyncio.sleep(retry_after)
            else:
                raise
    raise RuntimeError(f"Exhausted retries for: {prompt[:50]}")

FAQ

How do I determine the right semaphore limit for my LLM API?

Start with the API's documented rate limits. If the limit is 60 RPM, set the semaphore to 10-15 (allowing bursts but staying well under the limit). Monitor 429 error rates in production and adjust. A good rule of thumb: set concurrency to rate_limit / average_latency_seconds. If your average call takes 2 seconds and the limit is 60 RPM, max_concurrent = 60/60 * 2 = 2 concurrent calls would fully saturate the limit.

What is the difference between a semaphore and a rate limiter?

A semaphore limits how many operations happen simultaneously (concurrency). A rate limiter limits how many operations happen within a time window (throughput). If your LLM calls take 2 seconds each and you have a semaphore of 5, you can make roughly 150 requests per minute — far exceeding a 60 RPM rate limit. You need both.

Should I implement rate limiting per-API-key or per-endpoint?

Per-API-key, because that is how LLM providers enforce limits. If your application uses multiple API keys (e.g., for different tenants), create a separate throttler instance per key. If you call multiple LLM providers, each provider needs its own throttler with provider-specific limits.


#Python #RateLimiting #Asyncio #Semaphore #LLMAPI #AgenticAI #LearnAI #AIEngineering

Share

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.