Skip to content
Concurrent LLM Calls with asyncio.gather: Processing Multiple Prompts in Parallel
Learn Agentic AI13 min read39 views

Concurrent LLM Calls with asyncio.gather: Processing Multiple Prompts in Parallel

Learn how to make parallel LLM API calls using asyncio.gather with proper error handling, rate limiting, and result ordering for production AI agent systems.

The Case for Parallel LLM Calls

Most AI agent workflows involve multiple LLM calls: extracting entities, summarizing documents, classifying intent, generating responses. When these calls are independent, running them sequentially wastes massive amounts of time. A typical LLM API call takes 500ms to 3 seconds. Five sequential calls means 5-15 seconds of wall-clock time. Running them in parallel brings that down to the duration of the single slowest call.

asyncio.gather() is the primary tool for this pattern. It takes multiple coroutines, schedules them concurrently, and returns their results in the original order.

Basic Parallel LLM Calls

import asyncio
import httpx
import time

API_URL = "https://api.openai.com/v1/chat/completions"

async def call_openai(
    client: httpx.AsyncClient,
    prompt: str,
    model: str = "gpt-4o",
) -> str:
    """Make a single LLM API call."""
    response = await client.post(
        API_URL,
        json={
            "model": model,
            "messages": [{"role": "user", "content": prompt}],
            "max_tokens": 256,
        },
    )
    response.raise_for_status()
    return response.json()["choices"][0]["message"]["content"]

async def parallel_llm_calls(prompts: list[str]) -> list[str]:
    """Process multiple prompts concurrently."""
    async with httpx.AsyncClient(
        headers={"Authorization": f"Bearer {API_KEY}"},
        timeout=30.0,
    ) as client:
        results = await asyncio.gather(
            *[call_openai(client, prompt) for prompt in prompts]
        )
    return results

async def main():
    prompts = [
        "Summarize the key benefits of microservices architecture.",
        "List 5 common Python antipatterns.",
        "Explain the CAP theorem in 3 sentences.",
        "What is the difference between OLTP and OLAP?",
    ]
    start = time.monotonic()
    results = await parallel_llm_calls(prompts)
    elapsed = time.monotonic() - start

    for prompt, result in zip(prompts, results):
        print(f"Q: {prompt[:50]}...")
        print(f"A: {result[:100]}...\n")
    print(f"Total time: {elapsed:.2f}s for {len(prompts)} calls")

asyncio.run(main())

Notice we share a single httpx.AsyncClient across all calls. This reuses the underlying TCP connection pool, avoiding the overhead of establishing new connections for each request.

flowchart LR
    INPUT(["User intent"])
    PARSE["Parse plus<br/>classify"]
    PLAN["Plan and tool<br/>selection"]
    AGENT["Agent loop<br/>LLM plus tools"]
    GUARD{"Guardrails<br/>and policy"}
    EXEC["Execute and<br/>verify result"]
    OBS[("Trace and metrics")]
    OUT(["Outcome plus<br/>next action"])
    INPUT --> PARSE --> PLAN --> AGENT --> GUARD
    GUARD -->|Pass| EXEC --> OUT
    GUARD -->|Fail| AGENT
    AGENT --> OBS
    style AGENT fill:#4f46e5,stroke:#4338ca,color:#fff
    style GUARD fill:#f59e0b,stroke:#d97706,color:#1f2937
    style OBS fill:#ede9fe,stroke:#7c3aed,color:#1e1b4b
    style OUT fill:#059669,stroke:#047857,color:#fff

Error Handling with return_exceptions

By default, asyncio.gather() cancels all remaining tasks when one raises an exception. Use return_exceptions=True to collect errors alongside successes.

Hear it before you finish reading

Talk to a live CallSphere AI voice agent in your browser — 60 seconds, no signup.

Try Live Demo →
async def safe_parallel_calls(
    client: httpx.AsyncClient,
    prompts: list[str],
) -> list[str | Exception]:
    """Process prompts in parallel, capturing errors per-prompt."""
    results = await asyncio.gather(
        *[call_openai(client, p) for p in prompts],
        return_exceptions=True,
    )

    processed = []
    for i, result in enumerate(results):
        if isinstance(result, Exception):
            print(f"Prompt {i} failed: {result}")
            processed.append(f"[ERROR] {type(result).__name__}")
        else:
            processed.append(result)
    return processed

This pattern is critical for production agents. You do not want one failed API call to discard the successful results of the other four calls in a batch.

Chunked Processing with Rate Limiting

LLM APIs enforce rate limits. Sending 100 requests simultaneously will trigger 429 errors. Process prompts in chunks to stay within limits.

async def chunked_parallel_calls(
    prompts: list[str],
    chunk_size: int = 5,
    delay_between_chunks: float = 1.0,
) -> list[str]:
    """Process prompts in rate-limited chunks."""
    all_results: list[str] = []

    async with httpx.AsyncClient(
        headers={"Authorization": f"Bearer {API_KEY}"},
        timeout=30.0,
    ) as client:
        for i in range(0, len(prompts), chunk_size):
            chunk = prompts[i : i + chunk_size]
            print(f"Processing chunk {i // chunk_size + 1} "
                  f"({len(chunk)} prompts)")

            results = await asyncio.gather(
                *[call_openai(client, p) for p in chunk],
                return_exceptions=True,
            )
            all_results.extend(results)

            # Rate limit: wait between chunks
            if i + chunk_size < len(prompts):
                await asyncio.sleep(delay_between_chunks)

    return all_results

Retry Logic for Failed Calls

Individual calls may fail due to transient errors. Wrap each call with retry logic.

async def call_with_retry(
    client: httpx.AsyncClient,
    prompt: str,
    max_retries: int = 3,
    base_delay: float = 1.0,
) -> str:
    """Call LLM with exponential backoff retry."""
    for attempt in range(max_retries):
        try:
            return await call_openai(client, prompt)
        except httpx.HTTPStatusError as e:
            if e.response.status_code == 429:
                retry_after = float(
                    e.response.headers.get("retry-after", base_delay)
                )
                wait = retry_after * (2 ** attempt)
                print(f"Rate limited. Retrying in {wait:.1f}s...")
                await asyncio.sleep(wait)
            elif e.response.status_code >= 500:
                wait = base_delay * (2 ** attempt)
                await asyncio.sleep(wait)
            else:
                raise
        except httpx.TimeoutException:
            wait = base_delay * (2 ** attempt)
            await asyncio.sleep(wait)

    raise RuntimeError(f"Failed after {max_retries} retries: {prompt[:50]}")

Result Ordering Guarantee

A key property of asyncio.gather() is that results are returned in the same order as the input coroutines, regardless of completion order. This means you can safely zip results back to their original prompts without any additional tracking.

Still reading? Stop comparing — try CallSphere live.

CallSphere ships complete AI voice agents per industry — 14 tools for healthcare, 10 agents for real estate, 4 specialists for salons. See how it actually handles a call before you book a demo.

async def analyze_documents(docs: list[str]) -> list[dict]:
    """Analyze multiple documents with ordered results."""
    tasks = [
        call_openai(client, f"Analyze this document: {doc}")
        for doc in docs
    ]
    analyses = await asyncio.gather(*tasks)

    # Results are guaranteed to match input order
    return [
        {"document": doc, "analysis": analysis}
        for doc, analysis in zip(docs, analyses)
    ]

FAQ

What happens if one call in asyncio.gather takes much longer than the others?

All results are returned only when every coroutine completes. If one call takes 10 seconds while others take 1 second, you wait the full 10 seconds. To avoid this, wrap slow calls with asyncio.wait_for(coroutine, timeout=5.0) to enforce per-call timeouts, or use asyncio.as_completed() to process results as they arrive.

Should I create a new httpx.AsyncClient per call or share one?

Always share a single client across calls. httpx.AsyncClient maintains a connection pool internally, so reusing it avoids TCP handshake overhead and reduces latency. Create one client at the start of your batch and pass it to all coroutines.

How do I handle different models or parameters for each parallel call?

Pass different parameters to each coroutine in the gather call. Since each coroutine is independent, you can mix models, temperatures, and token limits freely: asyncio.gather(call_openai(client, p1, model="gpt-4o"), call_openai(client, p2, model="gpt-4o-mini")).


#Python #Asyncio #LLMAPI #ParallelProcessing #AIAgents #AgenticAI #LearnAI #AIEngineering

Share

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.

Related Articles You May Like

AI Agents

Personal AI Assistant: How to Pick One for Business in 2026

A founder's guide to the personal AI assistant market: best AI assistant apps, business-grade options, and how CallSphere's voice agent fits in.

AI Agents

Free AI Agents in 2026: When Free Wins and When It Costs You

A founder's guide to free AI agents, low-code AI agent builders, and how to know when you should pay for a real platform like CallSphere.

Agentic AI

Graphiti: How Temporal Knowledge Graphs Give AI Voice Agents Persistent Memory (2026 Guide)

Graphiti is the open-source temporal knowledge graph for AI agents in 2026. Learn how bi-temporal memory beats vector RAG for voice agents and long-running LLMs.

AI Agents

Chatbot App vs ChatGPT: What's the Difference, and Which Do I Need?

Chatbot app vs ChatGPT in 2026: a founder's clear take on the difference, when to use which, and how a real AI chatbot app development works.

HVAC

Building an HVAC After-Hours Emergency Escalation System: A Complete Engineering Guide

How we built a fault-tolerant HVAC emergency triage and tech-dispatch platform on Kubernetes — three-tier CQRS, 11 micro-agents on the OpenAI Agents SDK + LangGraph, NATS JetStream, DTMF/SMS/WebSocket acceptance, circuit breakers, and an evaluation pipeline that catches regressions before they wake a tech at 3 AM.

Enterprise AI

OpenAI Frontier vs Anthropic Managed Agents: 2026 Comparison

Head-to-head: OpenAI Frontier and Anthropic's managed agent stack — strengths, fit, and what each means for enterprise AI voice and chat deployment.