---
title: "Error Handling in Async Agent Code: Timeouts, Cancellation, and Graceful Shutdown"
description: "Master error handling in async Python for AI agents. Learn asyncio.timeout, task cancellation, cleanup patterns, and exception groups for robust agent systems."
canonical: https://callsphere.ai/blog/error-handling-async-agent-code-timeouts-cancellation-graceful-shutdown
category: "Learn Agentic AI"
tags: ["Python", "Error Handling", "asyncio", "Timeouts", "AI Agents"]
author: "CallSphere Team"
published: 2026-03-17T00:00:00.000Z
updated: 2026-05-31T01:46:25.569Z
---

# Error Handling in Async Agent Code: Timeouts, Cancellation, and Graceful Shutdown

> Master error handling in async Python for AI agents. Learn asyncio.timeout, task cancellation, cleanup patterns, and exception groups for robust agent systems.

## Why Async Error Handling Is Different

Synchronous error handling is straightforward: exceptions propagate up the call stack, and a single try/except catches them. Async code introduces new failure modes. A coroutine can be cancelled externally. Multiple concurrent tasks can fail simultaneously. An event loop shutdown must clean up dozens of in-flight operations. LLM API calls can hang indefinitely without proper timeouts.

Getting error handling right in async agent code is the difference between an agent that recovers gracefully and one that silently drops user requests.

## Timeouts: The First Line of Defense

LLM APIs can hang — network partitions, overloaded servers, malformed requests that never complete. Always enforce timeouts.

```mermaid
flowchart LR
    INPUT(["User intent"])
    PARSE["Parse plus
classify"]
    PLAN["Plan and tool
selection"]
    AGENT["Agent loop
LLM plus tools"]
    GUARD{"Guardrails
and policy"}
    EXEC["Execute and
verify result"]
    OBS[("Trace and metrics")]
    OUT(["Outcome plus
next action"])
    INPUT --> PARSE --> PLAN --> AGENT --> GUARD
    GUARD -->|Pass| EXEC --> OUT
    GUARD -->|Fail| AGENT
    AGENT --> OBS
    style AGENT fill:#4f46e5,stroke:#4338ca,color:#fff
    style GUARD fill:#f59e0b,stroke:#d97706,color:#1f2937
    style OBS fill:#ede9fe,stroke:#7c3aed,color:#1e1b4b
    style OUT fill:#059669,stroke:#047857,color:#fff
```

```python
import asyncio
import httpx

async def call_llm_with_timeout(
    client: httpx.AsyncClient,
    prompt: str,
    timeout_seconds: float = 30.0,
) -> str:
    """Call LLM with a strict timeout."""
    try:
        async with asyncio.timeout(timeout_seconds):
            response = await client.post(
                "https://api.openai.com/v1/chat/completions",
                json={
                    "model": "gpt-4o",
                    "messages": [{"role": "user", "content": prompt}],
                },
            )
            response.raise_for_status()
            return response.json()["choices"][0]["message"]["content"]
    except TimeoutError:
        print(f"LLM call timed out after {timeout_seconds}s")
        raise
    except httpx.HTTPStatusError as e:
        print(f"HTTP error {e.response.status_code}: {e.response.text}")
        raise

async def agent_step_with_fallback(
    client: httpx.AsyncClient,
    prompt: str,
) -> str:
    """Agent step with timeout and fallback."""
    try:
        return await call_llm_with_timeout(client, prompt, timeout_seconds=15.0)
    except (TimeoutError, httpx.HTTPStatusError):
        # Fallback to a faster, simpler model
        return await call_llm_with_timeout(
            client,
            prompt,
            timeout_seconds=10.0,
        )
```

`asyncio.timeout()` (Python 3.11+) creates a context manager that raises `TimeoutError` if the block does not complete within the specified duration. It is the recommended replacement for the older `asyncio.wait_for()`.

## Task Cancellation

Tasks can be cancelled externally — for example, when a user disconnects or a parent operation times out. Handle cancellation explicitly.

```python
async def cancellable_agent_workflow(session_id: str) -> str:
    """Agent workflow that handles cancellation cleanly."""
    resources = []
    try:
        # Acquire resources
        db_conn = await get_db_connection()
        resources.append(db_conn)

        # Long-running LLM work
        context = await retrieve_context(session_id)
        response = await generate_response(context)
        await save_response(db_conn, session_id, response)
        return response

    except asyncio.CancelledError:
        # Clean up any partial state
        print(f"Workflow cancelled for session {session_id}")
        await mark_session_cancelled(session_id)
        raise  # Always re-raise CancelledError

    finally:
        # Release resources regardless of outcome
        for resource in resources:
            await resource.close()
```

The critical rule: **always re-raise `CancelledError`**. Swallowing it prevents the event loop from properly shutting down the task.

## Exception Groups (Python 3.11+)

When `asyncio.gather()` runs with `return_exceptions=False` (the default), only the first exception propagates. Python 3.11 introduced `TaskGroup` with exception groups to capture all failures.

```python
async def robust_parallel_calls(prompts: list[str]) -> list[str]:
    """Process prompts with proper multi-exception handling."""
    results = [None] * len(prompts)

    async with httpx.AsyncClient(
        headers={"Authorization": f"Bearer {API_KEY}"},
        timeout=30.0,
    ) as client:
        try:
            async with asyncio.TaskGroup() as tg:
                tasks = [
                    tg.create_task(
                        call_llm_with_timeout(client, prompt),
                        name=f"prompt_{i}",
                    )
                    for i, prompt in enumerate(prompts)
                ]
        except* httpx.HTTPStatusError as eg:
            print(f"{len(eg.exceptions)} HTTP errors occurred:")
            for exc in eg.exceptions:
                print(f"  - {exc.response.status_code}")
        except* TimeoutError as eg:
            print(f"{len(eg.exceptions)} timeouts occurred")
        else:
            results = [task.result() for task in tasks]

    return results
```

The `except*` syntax matches specific exception types within an `ExceptionGroup`, letting you handle different failure classes separately.

## Graceful Shutdown

When your agent service receives a shutdown signal, it must finish in-flight requests, clean up resources, and exit cleanly.

```python
import signal

class AgentService:
    def __init__(self):
        self._shutdown_event = asyncio.Event()
        self._active_tasks: set[asyncio.Task] = set()

    async def handle_request(self, request: dict) -> dict:
        """Process a single agent request."""
        task = asyncio.current_task()
        self._active_tasks.add(task)
        try:
            result = await self._run_agent_workflow(request)
            return {"status": "success", "result": result}
        except asyncio.CancelledError:
            return {"status": "cancelled"}
        finally:
            self._active_tasks.discard(task)

    async def shutdown(self, grace_period: float = 30.0):
        """Gracefully shut down the service."""
        print(f"Shutting down. {len(self._active_tasks)} tasks in flight.")
        self._shutdown_event.set()

        if self._active_tasks:
            # Wait for active tasks to complete
            print(f"Waiting up to {grace_period}s for tasks...")
            try:
                async with asyncio.timeout(grace_period):
                    await asyncio.gather(
                        *self._active_tasks,
                        return_exceptions=True,
                    )
            except TimeoutError:
                # Force cancel remaining tasks
                print("Grace period expired. Cancelling tasks.")
                for task in self._active_tasks:
                    task.cancel()
                await asyncio.gather(
                    *self._active_tasks,
                    return_exceptions=True,
                )
        print("Shutdown complete.")

    async def run(self):
        """Main service loop."""
        loop = asyncio.get_running_loop()
        loop.add_signal_handler(
            signal.SIGTERM,
            lambda: asyncio.create_task(self.shutdown()),
        )
        loop.add_signal_handler(
            signal.SIGINT,
            lambda: asyncio.create_task(self.shutdown()),
        )

        # Service loop
        while not self._shutdown_event.is_set():
            await asyncio.sleep(0.1)
```

## Structured Error Context

Wrap errors with context to make debugging async agent failures tractable.

```python
class AgentStepError(Exception):
    """Error with agent step context for debugging."""

    def __init__(self, step: str, session_id: str, cause: Exception):
        self.step = step
        self.session_id = session_id
        self.cause = cause
        super().__init__(
            f"Step '{step}' failed for session {session_id}: {cause}"
        )

async def run_step_with_context(
    step_name: str,
    session_id: str,
    coro,
):
    """Run a step with structured error wrapping."""
    try:
        return await coro
    except asyncio.CancelledError:
        raise  # Never wrap cancellation
    except Exception as e:
        raise AgentStepError(step_name, session_id, e) from e
```

## FAQ

### Should I use asyncio.timeout or httpx's built-in timeout?

Use both. httpx's timeout handles connection-level failures (connect timeout, read timeout). asyncio.timeout wraps the entire operation including retries, parsing, and any processing you do with the response. They serve different purposes: httpx catches slow networks, asyncio.timeout catches slow business logic.

### How do I debug tasks that silently disappear?

Tasks that raise unhandled exceptions outside of an await are logged as warnings but easily missed. Always store task references and check their results: `task = asyncio.create_task(coro()); task.add_done_callback(handle_task_result)`. In the callback, check `task.exception()` and log it explicitly. TaskGroup in Python 3.11+ makes this easier by propagating all exceptions.

### When should I catch CancelledError vs let it propagate?

Catch it only to perform cleanup (closing connections, saving state, rolling back transactions), then always re-raise it. The only exception is top-level request handlers where you want to return a "cancelled" response to the client. Never silently swallow CancelledError — it breaks asyncio's task management.

---

#Python #ErrorHandling #Asyncio #Timeouts #AIAgents #AgenticAI #LearnAI #AIEngineering

---

Source: https://callsphere.ai/blog/error-handling-async-agent-code-timeouts-cancellation-graceful-shutdown
