Skip to content
Parallel Tool Execution: Running Multiple Tools Simultaneously in Agents
Learn Agentic AI11 min read21 views

Parallel Tool Execution: Running Multiple Tools Simultaneously in Agents

Learn how to execute multiple tool calls in parallel to dramatically speed up AI agent workflows. Covers async execution with asyncio.gather, handling partial failures, result aggregation, and timeout management.

The Serial Execution Bottleneck

When an LLM returns multiple tool calls in a single response, the naive approach executes them one at a time. If the agent calls three APIs that each take 2 seconds, the total wait is 6 seconds. With parallel execution, all three run simultaneously and the total wait is roughly 2 seconds. For agents that frequently call multiple tools per turn, this is a significant performance improvement.

Modern LLMs like GPT-4o and Claude commonly generate multiple tool calls in a single response. Your agent loop needs to handle this efficiently.

Detecting Parallel Tool Calls

The OpenAI API returns multiple tool calls in the tool_calls array of a single message. Each call has its own id, function.name, and function.arguments:

flowchart TD
    USER(["User message"])
    LLM["LLM call<br/>with tools schema"]
    DECIDE{"Model wants<br/>to call a tool?"}
    EXEC["Execute tool<br/>sandboxed runtime"]
    RESULT["Append tool_result<br/>to messages"]
    GUARD{"Output passes<br/>guardrails?"}
    DONE(["Final reply"])
    BLOCK(["Refuse and log"])
    USER --> LLM --> DECIDE
    DECIDE -->|Yes| EXEC --> RESULT --> LLM
    DECIDE -->|No| GUARD
    GUARD -->|Yes| DONE
    GUARD -->|No| BLOCK
    style LLM fill:#4f46e5,stroke:#4338ca,color:#fff
    style EXEC fill:#ede9fe,stroke:#7c3aed,color:#1e1b4b
    style GUARD fill:#f59e0b,stroke:#d97706,color:#1f2937
    style DONE fill:#059669,stroke:#047857,color:#fff
    style BLOCK fill:#dc2626,stroke:#b91c1c,color:#fff
response = client.chat.completions.create(
    model="gpt-4o",
    messages=messages,
    tools=tool_schemas,
)

message = response.choices[0].message

if message.tool_calls:
    print(f"LLM requested {len(message.tool_calls)} tool calls")
    for tc in message.tool_calls:
        print(f"  - {tc.function.name}({tc.function.arguments})")

When there are multiple entries in tool_calls, the model is telling you these calls are independent and can run concurrently.

Basic Parallel Execution with asyncio.gather

The core pattern uses asyncio.gather to run all tool calls simultaneously:

Hear it before you finish reading

Talk to a live CallSphere AI voice agent in your browser — 60 seconds, no signup.

Try Live Demo →
import asyncio
import json

async def execute_tool(name: str, arguments: str) -> str:
    args = json.loads(arguments)

    if name == "search_web":
        return await search_web(**args)
    elif name == "query_database":
        return await query_database(**args)
    elif name == "fetch_weather":
        return await fetch_weather(**args)
    else:
        return f"Error: Unknown tool {name}"

async def execute_tools_parallel(tool_calls) -> list[dict]:
    tasks = [
        execute_tool(tc.function.name, tc.function.arguments)
        for tc in tool_calls
    ]

    results = await asyncio.gather(*tasks, return_exceptions=True)

    tool_results = []
    for tc, result in zip(tool_calls, results):
        if isinstance(result, Exception):
            content = f"Error: Tool execution failed - {str(result)}"
        else:
            content = result

        tool_results.append({
            "role": "tool",
            "tool_call_id": tc.id,
            "content": content,
        })

    return tool_results

The return_exceptions=True parameter is critical. Without it, a single failing tool call would cancel all other tasks and raise the exception immediately. With it, exceptions are returned as values in the results list, allowing successful calls to complete.

The Complete Parallel Agent Loop

Here is a full agent loop that handles parallel execution:

from openai import AsyncOpenAI

client = AsyncOpenAI()

async def run_agent(user_message: str, tools: list, system_prompt: str) -> str:
    messages = [
        {"role": "system", "content": system_prompt},
        {"role": "user", "content": user_message},
    ]

    max_iterations = 10

    for _ in range(max_iterations):
        response = await client.chat.completions.create(
            model="gpt-4o",
            messages=messages,
            tools=tools,
        )

        message = response.choices[0].message
        messages.append(message)

        if not message.tool_calls:
            return message.content

        # Execute all tool calls in parallel
        tool_results = await execute_tools_parallel(message.tool_calls)
        messages.extend(tool_results)

    return "Error: Agent exceeded maximum iterations"

Each iteration either returns a final text response or executes all tool calls in parallel and feeds the results back to the LLM.

Handling Partial Failures

In production, some tool calls succeed while others fail. The LLM needs to know which succeeded and which failed so it can decide whether to retry, use partial results, or ask the user for help:

async def execute_tools_with_status(tool_calls) -> list[dict]:
    tasks = []
    for tc in tool_calls:
        task = asyncio.create_task(
            execute_tool(tc.function.name, tc.function.arguments)
        )
        tasks.append((tc, task))

    tool_results = []
    for tc, task in tasks:
        try:
            result = await asyncio.wait_for(task, timeout=30.0)
            tool_results.append({
                "role": "tool",
                "tool_call_id": tc.id,
                "content": result,
            })
        except asyncio.TimeoutError:
            tool_results.append({
                "role": "tool",
                "tool_call_id": tc.id,
                "content": f"Error: {tc.function.name} timed out after 30 seconds",
            })
        except Exception as e:
            tool_results.append({
                "role": "tool",
                "tool_call_id": tc.id,
                "content": f"Error: {tc.function.name} failed - {str(e)}",
            })

    return tool_results

Per-task timeouts ensure one slow tool does not hold up the entire batch. The individual error messages let the LLM reason about what data it has and what is missing.

Semaphore-Based Concurrency Limits

Unlimited parallelism can overwhelm external services. Use a semaphore to cap concurrent executions:

Still reading? Stop comparing — try CallSphere live.

CallSphere ships complete AI voice agents per industry — 14 tools for healthcare, 10 agents for real estate, 4 specialists for salons. See how it actually handles a call before you book a demo.

class ParallelExecutor:
    def __init__(self, max_concurrent: int = 5):
        self.semaphore = asyncio.Semaphore(max_concurrent)

    async def execute_tool_limited(self, name: str, arguments: str) -> str:
        async with self.semaphore:
            return await execute_tool(name, arguments)

    async def execute_batch(self, tool_calls) -> list[dict]:
        tasks = [
            self.execute_tool_limited(tc.function.name, tc.function.arguments)
            for tc in tool_calls
        ]
        results = await asyncio.gather(*tasks, return_exceptions=True)

        return [
            {
                "role": "tool",
                "tool_call_id": tc.id,
                "content": str(r) if isinstance(r, Exception) else r,
            }
            for tc, r in zip(tool_calls, results)
        ]

executor = ParallelExecutor(max_concurrent=5)

A semaphore of 5 means at most 5 tools run simultaneously. If the LLM requests 10 tool calls, the first 5 start immediately and the remaining 5 start as slots become available.

Result Aggregation Patterns

When tools return related data, you may want to aggregate results before passing them back:

async def aggregate_search_results(tool_calls, results) -> list[dict]:
    aggregated = []
    for tc, result in zip(tool_calls, results):
        if isinstance(result, str) and result.startswith("Error"):
            aggregated.append({"role": "tool", "tool_call_id": tc.id, "content": result})
            continue

        summary = f"Results from {tc.function.name}:\n"
        try:
            data = json.loads(result)
            if isinstance(data, list):
                summary += f"Found {len(data)} items.\n"
                summary += json.dumps(data[:10], indent=2)
            else:
                summary += json.dumps(data, indent=2)
        except json.JSONDecodeError:
            summary += result[:2000]

        aggregated.append({"role": "tool", "tool_call_id": tc.id, "content": summary})

    return aggregated

FAQ

Do all LLMs support parallel tool calls?

Most frontier models do. GPT-4o, GPT-4 Turbo, and Claude 3.5/4 all generate multiple tool calls in a single response when they detect the calls are independent. Older models and some open-source models may only generate one tool call at a time. Your agent loop should handle both cases — the parallel execution code works fine with a single tool call too.

What happens if I return tool results in a different order than the tool calls?

The LLM matches results to calls using the tool_call_id field, not by position. You can return results in any order as long as each result has the correct tool_call_id. This is important for parallel execution where faster tools finish before slower ones.

Should I parallelize CPU-bound tools too?

Use asyncio.gather for I/O-bound tools (API calls, database queries, file reads). For CPU-bound tools (data processing, computation), use asyncio.to_thread or concurrent.futures.ProcessPoolExecutor to avoid blocking the event loop. Mixing both types is common and the executor pattern handles it cleanly.


#ParallelExecution #AsyncPython #Performance #ToolDesign #AIAgents #AgenticAI #LearnAI #AIEngineering

Share

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.

Related Articles You May Like

AI Agents

Personal AI Assistant: How to Pick One for Business in 2026

A founder's guide to the personal AI assistant market: best AI assistant apps, business-grade options, and how CallSphere's voice agent fits in.

AI Agents

Free AI Agents in 2026: When Free Wins and When It Costs You

A founder's guide to free AI agents, low-code AI agent builders, and how to know when you should pay for a real platform like CallSphere.

Agentic AI

Graphiti: How Temporal Knowledge Graphs Give AI Voice Agents Persistent Memory (2026 Guide)

Graphiti is the open-source temporal knowledge graph for AI agents in 2026. Learn how bi-temporal memory beats vector RAG for voice agents and long-running LLMs.

AI Agents

Chatbot App vs ChatGPT: What's the Difference, and Which Do I Need?

Chatbot app vs ChatGPT in 2026: a founder's clear take on the difference, when to use which, and how a real AI chatbot app development works.

HVAC

Building an HVAC After-Hours Emergency Escalation System: A Complete Engineering Guide

How we built a fault-tolerant HVAC emergency triage and tech-dispatch platform on Kubernetes — three-tier CQRS, 11 micro-agents on the OpenAI Agents SDK + LangGraph, NATS JetStream, DTMF/SMS/WebSocket acceptance, circuit breakers, and an evaluation pipeline that catches regressions before they wake a tech at 3 AM.

Enterprise AI

OpenAI Frontier vs Anthropic Managed Agents: 2026 Comparison

Head-to-head: OpenAI Frontier and Anthropic's managed agent stack — strengths, fit, and what each means for enterprise AI voice and chat deployment.