Skip to content
Error Handling in Async Agent Code: Timeouts, Cancellation, and Graceful Shutdown
Learn Agentic AI13 min read28 views

Error Handling in Async Agent Code: Timeouts, Cancellation, and Graceful Shutdown

Master error handling in async Python for AI agents. Learn asyncio.timeout, task cancellation, cleanup patterns, and exception groups for robust agent systems.

Why Async Error Handling Is Different

Synchronous error handling is straightforward: exceptions propagate up the call stack, and a single try/except catches them. Async code introduces new failure modes. A coroutine can be cancelled externally. Multiple concurrent tasks can fail simultaneously. An event loop shutdown must clean up dozens of in-flight operations. LLM API calls can hang indefinitely without proper timeouts.

Getting error handling right in async agent code is the difference between an agent that recovers gracefully and one that silently drops user requests.

Timeouts: The First Line of Defense

LLM APIs can hang — network partitions, overloaded servers, malformed requests that never complete. Always enforce timeouts.

flowchart LR
    INPUT(["User intent"])
    PARSE["Parse plus<br/>classify"]
    PLAN["Plan and tool<br/>selection"]
    AGENT["Agent loop<br/>LLM plus tools"]
    GUARD{"Guardrails<br/>and policy"}
    EXEC["Execute and<br/>verify result"]
    OBS[("Trace and metrics")]
    OUT(["Outcome plus<br/>next action"])
    INPUT --> PARSE --> PLAN --> AGENT --> GUARD
    GUARD -->|Pass| EXEC --> OUT
    GUARD -->|Fail| AGENT
    AGENT --> OBS
    style AGENT fill:#4f46e5,stroke:#4338ca,color:#fff
    style GUARD fill:#f59e0b,stroke:#d97706,color:#1f2937
    style OBS fill:#ede9fe,stroke:#7c3aed,color:#1e1b4b
    style OUT fill:#059669,stroke:#047857,color:#fff
import asyncio
import httpx

async def call_llm_with_timeout(
    client: httpx.AsyncClient,
    prompt: str,
    timeout_seconds: float = 30.0,
) -> str:
    """Call LLM with a strict timeout."""
    try:
        async with asyncio.timeout(timeout_seconds):
            response = await client.post(
                "https://api.openai.com/v1/chat/completions",
                json={
                    "model": "gpt-4o",
                    "messages": [{"role": "user", "content": prompt}],
                },
            )
            response.raise_for_status()
            return response.json()["choices"][0]["message"]["content"]
    except TimeoutError:
        print(f"LLM call timed out after {timeout_seconds}s")
        raise
    except httpx.HTTPStatusError as e:
        print(f"HTTP error {e.response.status_code}: {e.response.text}")
        raise

async def agent_step_with_fallback(
    client: httpx.AsyncClient,
    prompt: str,
) -> str:
    """Agent step with timeout and fallback."""
    try:
        return await call_llm_with_timeout(client, prompt, timeout_seconds=15.0)
    except (TimeoutError, httpx.HTTPStatusError):
        # Fallback to a faster, simpler model
        return await call_llm_with_timeout(
            client,
            prompt,
            timeout_seconds=10.0,
        )

asyncio.timeout() (Python 3.11+) creates a context manager that raises TimeoutError if the block does not complete within the specified duration. It is the recommended replacement for the older asyncio.wait_for().

Hear it before you finish reading

Talk to a live CallSphere AI voice agent in your browser — 60 seconds, no signup.

Try Live Demo →

Task Cancellation

Tasks can be cancelled externally — for example, when a user disconnects or a parent operation times out. Handle cancellation explicitly.

async def cancellable_agent_workflow(session_id: str) -> str:
    """Agent workflow that handles cancellation cleanly."""
    resources = []
    try:
        # Acquire resources
        db_conn = await get_db_connection()
        resources.append(db_conn)

        # Long-running LLM work
        context = await retrieve_context(session_id)
        response = await generate_response(context)
        await save_response(db_conn, session_id, response)
        return response

    except asyncio.CancelledError:
        # Clean up any partial state
        print(f"Workflow cancelled for session {session_id}")
        await mark_session_cancelled(session_id)
        raise  # Always re-raise CancelledError

    finally:
        # Release resources regardless of outcome
        for resource in resources:
            await resource.close()

The critical rule: always re-raise CancelledError. Swallowing it prevents the event loop from properly shutting down the task.

Exception Groups (Python 3.11+)

When asyncio.gather() runs with return_exceptions=False (the default), only the first exception propagates. Python 3.11 introduced TaskGroup with exception groups to capture all failures.

async def robust_parallel_calls(prompts: list[str]) -> list[str]:
    """Process prompts with proper multi-exception handling."""
    results = [None] * len(prompts)

    async with httpx.AsyncClient(
        headers={"Authorization": f"Bearer {API_KEY}"},
        timeout=30.0,
    ) as client:
        try:
            async with asyncio.TaskGroup() as tg:
                tasks = [
                    tg.create_task(
                        call_llm_with_timeout(client, prompt),
                        name=f"prompt_{i}",
                    )
                    for i, prompt in enumerate(prompts)
                ]
        except* httpx.HTTPStatusError as eg:
            print(f"{len(eg.exceptions)} HTTP errors occurred:")
            for exc in eg.exceptions:
                print(f"  - {exc.response.status_code}")
        except* TimeoutError as eg:
            print(f"{len(eg.exceptions)} timeouts occurred")
        else:
            results = [task.result() for task in tasks]

    return results

The except* syntax matches specific exception types within an ExceptionGroup, letting you handle different failure classes separately.

Graceful Shutdown

When your agent service receives a shutdown signal, it must finish in-flight requests, clean up resources, and exit cleanly.

Still reading? Stop comparing — try CallSphere live.

CallSphere ships complete AI voice agents per industry — 14 tools for healthcare, 10 agents for real estate, 4 specialists for salons. See how it actually handles a call before you book a demo.

import signal

class AgentService:
    def __init__(self):
        self._shutdown_event = asyncio.Event()
        self._active_tasks: set[asyncio.Task] = set()

    async def handle_request(self, request: dict) -> dict:
        """Process a single agent request."""
        task = asyncio.current_task()
        self._active_tasks.add(task)
        try:
            result = await self._run_agent_workflow(request)
            return {"status": "success", "result": result}
        except asyncio.CancelledError:
            return {"status": "cancelled"}
        finally:
            self._active_tasks.discard(task)

    async def shutdown(self, grace_period: float = 30.0):
        """Gracefully shut down the service."""
        print(f"Shutting down. {len(self._active_tasks)} tasks in flight.")
        self._shutdown_event.set()

        if self._active_tasks:
            # Wait for active tasks to complete
            print(f"Waiting up to {grace_period}s for tasks...")
            try:
                async with asyncio.timeout(grace_period):
                    await asyncio.gather(
                        *self._active_tasks,
                        return_exceptions=True,
                    )
            except TimeoutError:
                # Force cancel remaining tasks
                print("Grace period expired. Cancelling tasks.")
                for task in self._active_tasks:
                    task.cancel()
                await asyncio.gather(
                    *self._active_tasks,
                    return_exceptions=True,
                )
        print("Shutdown complete.")

    async def run(self):
        """Main service loop."""
        loop = asyncio.get_running_loop()
        loop.add_signal_handler(
            signal.SIGTERM,
            lambda: asyncio.create_task(self.shutdown()),
        )
        loop.add_signal_handler(
            signal.SIGINT,
            lambda: asyncio.create_task(self.shutdown()),
        )

        # Service loop
        while not self._shutdown_event.is_set():
            await asyncio.sleep(0.1)

Structured Error Context

Wrap errors with context to make debugging async agent failures tractable.

class AgentStepError(Exception):
    """Error with agent step context for debugging."""

    def __init__(self, step: str, session_id: str, cause: Exception):
        self.step = step
        self.session_id = session_id
        self.cause = cause
        super().__init__(
            f"Step '{step}' failed for session {session_id}: {cause}"
        )

async def run_step_with_context(
    step_name: str,
    session_id: str,
    coro,
):
    """Run a step with structured error wrapping."""
    try:
        return await coro
    except asyncio.CancelledError:
        raise  # Never wrap cancellation
    except Exception as e:
        raise AgentStepError(step_name, session_id, e) from e

FAQ

Should I use asyncio.timeout or httpx's built-in timeout?

Use both. httpx's timeout handles connection-level failures (connect timeout, read timeout). asyncio.timeout wraps the entire operation including retries, parsing, and any processing you do with the response. They serve different purposes: httpx catches slow networks, asyncio.timeout catches slow business logic.

How do I debug tasks that silently disappear?

Tasks that raise unhandled exceptions outside of an await are logged as warnings but easily missed. Always store task references and check their results: task = asyncio.create_task(coro()); task.add_done_callback(handle_task_result). In the callback, check task.exception() and log it explicitly. TaskGroup in Python 3.11+ makes this easier by propagating all exceptions.

When should I catch CancelledError vs let it propagate?

Catch it only to perform cleanup (closing connections, saving state, rolling back transactions), then always re-raise it. The only exception is top-level request handlers where you want to return a "cancelled" response to the client. Never silently swallow CancelledError — it breaks asyncio's task management.


#Python #ErrorHandling #Asyncio #Timeouts #AIAgents #AgenticAI #LearnAI #AIEngineering

Share

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.

Related Articles You May Like

AI Agents

Personal AI Assistant: How to Pick One for Business in 2026

A founder's guide to the personal AI assistant market: best AI assistant apps, business-grade options, and how CallSphere's voice agent fits in.

AI Agents

Free AI Agents in 2026: When Free Wins and When It Costs You

A founder's guide to free AI agents, low-code AI agent builders, and how to know when you should pay for a real platform like CallSphere.

Agentic AI

Graphiti: How Temporal Knowledge Graphs Give AI Voice Agents Persistent Memory (2026 Guide)

Graphiti is the open-source temporal knowledge graph for AI agents in 2026. Learn how bi-temporal memory beats vector RAG for voice agents and long-running LLMs.

AI Agents

Chatbot App vs ChatGPT: What's the Difference, and Which Do I Need?

Chatbot app vs ChatGPT in 2026: a founder's clear take on the difference, when to use which, and how a real AI chatbot app development works.

HVAC

Building an HVAC After-Hours Emergency Escalation System: A Complete Engineering Guide

How we built a fault-tolerant HVAC emergency triage and tech-dispatch platform on Kubernetes — three-tier CQRS, 11 micro-agents on the OpenAI Agents SDK + LangGraph, NATS JetStream, DTMF/SMS/WebSocket acceptance, circuit breakers, and an evaluation pipeline that catches regressions before they wake a tech at 3 AM.

Enterprise AI

OpenAI Frontier vs Anthropic Managed Agents: 2026 Comparison

Head-to-head: OpenAI Frontier and Anthropic's managed agent stack — strengths, fit, and what each means for enterprise AI voice and chat deployment.