---
title: "Claude API Batching: Processing Thousands of Requests Cost-Effectively"
description: "Master the Claude Message Batches API for high-volume, cost-effective processing. Learn how to submit batch jobs, poll for results, handle errors, and save 50% on Claude API costs for non-real-time workloads."
canonical: https://callsphere.ai/blog/claude-api-batching-cost-effective
category: "Agentic AI"
tags: ["Claude API", "Batch Processing", "Cost Optimization", "High Volume", "Anthropic"]
author: "CallSphere Team"
published: 2026-01-28T00:00:00.000Z
updated: 2026-05-07T04:33:05.918Z
---

# Claude API Batching: Processing Thousands of Requests Cost-Effectively

> Master the Claude Message Batches API for high-volume, cost-effective processing. Learn how to submit batch jobs, poll for results, handle errors, and save 50% on Claude API costs for non-real-time workloads.

## What Is the Message Batches API?

The Claude Message Batches API allows you to submit up to 10,000 requests in a single batch and receive results asynchronously. Each request in the batch gets a 50% discount on both input and output tokens compared to the standard Messages API.

The tradeoff: batches can take up to 24 hours to complete (though most finish within 1-2 hours). This makes the Batch API ideal for workloads that do not require real-time responses.

### Ideal Use Cases

- Document classification across thousands of files
- Bulk content moderation
- Dataset annotation and labeling
- Nightly report generation
- Mass email personalization
- Code analysis across a large codebase
- Evaluation and testing of prompts at scale

## Submitting a Batch

```python
from anthropic import Anthropic

client = Anthropic()

# Each request in the batch follows the standard Messages API format
requests = []
for i, document in enumerate(documents):
    requests.append({
        "custom_id": f"doc-{i}",  # Your identifier for tracking
        "params": {
            "model": "claude-sonnet-4-5-20250514",
            "max_tokens": 1024,
            "messages": [{
                "role": "user",
                "content": f"Classify this document into one of: [legal, financial, technical, marketing].\n\nDocument:\n{document}"
            }]
        }
    })

# Submit the batch
batch = client.messages.batches.create(requests=requests)

print(f"Batch ID: {batch.id}")
print(f"Status: {batch.processing_status}")
print(f"Total requests: {batch.request_counts.total}")
```

## Polling for Results

```python
import time

def wait_for_batch(batch_id: str, poll_interval: int = 30) -> dict:
    """Poll until batch completes."""
    while True:
        batch = client.messages.batches.retrieve(batch_id)

        print(f"Status: {batch.processing_status}")
        print(f"  Succeeded: {batch.request_counts.succeeded}")
        print(f"  Errored: {batch.request_counts.errored}")
        print(f"  Processing: {batch.request_counts.processing}")

        if batch.processing_status == "ended":
            return batch

        time.sleep(poll_interval)

batch_result = wait_for_batch(batch.id)
```

## Retrieving Results

```python
def get_batch_results(batch_id: str) -> dict[str, str]:
    """Retrieve all results from a completed batch."""
    results = {}

    for result in client.messages.batches.results(batch_id):
        custom_id = result.custom_id

        if result.result.type == "succeeded":
            message = result.result.message
            text = message.content[0].text
            results[custom_id] = {
                "status": "success",
                "text": text,
                "input_tokens": message.usage.input_tokens,
                "output_tokens": message.usage.output_tokens,
            }
        elif result.result.type == "errored":
            results[custom_id] = {
                "status": "error",
                "error": str(result.result.error),
            }
        elif result.result.type == "expired":
            results[custom_id] = {
                "status": "expired",
            }

    return results

results = get_batch_results(batch.id)
for custom_id, result in results.items():
    if result["status"] == "success":
        print(f"{custom_id}: {result['text'][:100]}...")
```

## Production Batch Pipeline

Here is a complete pipeline for batch-processing a dataset:

```mermaid
flowchart LR
    USER(["User message"])
    LOOP{"messages.create
agent loop"}
    THINK["Extended thinking
optional"]
    TOOL{"stop_reason
tool_use?"}
    EXEC["Execute tool
append tool_result"]
    DONE(["stop_reason
end_turn"])
    USER --> LOOP --> THINK --> TOOL
    TOOL -->|Yes| EXEC --> LOOP
    TOOL -->|No| DONE
    style LOOP fill:#4f46e5,stroke:#4338ca,color:#fff
    style THINK fill:#ede9fe,stroke:#7c3aed,color:#1e1b4b
    style DONE fill:#059669,stroke:#047857,color:#fff
```

```python
import json
import asyncio
from pathlib import Path
from datetime import datetime

class BatchPipeline:
    def __init__(self, client: Anthropic, output_dir: str = "./batch_results"):
        self.client = client
        self.output_dir = Path(output_dir)
        self.output_dir.mkdir(exist_ok=True)

    def prepare_requests(
        self,
        items: list[dict],
        system_prompt: str,
        user_template: str,
        model: str = "claude-sonnet-4-5-20250514",
        max_tokens: int = 1024,
    ) -> list[dict]:
        """Convert items into batch request format."""
        requests = []
        for item in items:
            user_content = user_template.format(**item)
            requests.append({
                "custom_id": str(item.get("id", len(requests))),
                "params": {
                    "model": model,
                    "max_tokens": max_tokens,
                    "system": system_prompt,
                    "messages": [{"role": "user", "content": user_content}],
                }
            })
        return requests

    def submit(self, requests: list[dict]) -> str:
        """Submit batch and return batch ID."""
        # Batch API supports up to 10,000 requests
        if len(requests) > 10_000:
            raise ValueError(f"Too many requests: {len(requests)} (max 10,000)")

        batch = self.client.messages.batches.create(requests=requests)

        # Save metadata
        metadata = {
            "batch_id": batch.id,
            "submitted_at": datetime.utcnow().isoformat(),
            "total_requests": len(requests),
        }
        with open(self.output_dir / f"{batch.id}_metadata.json", "w") as f:
            json.dump(metadata, f)

        return batch.id

    def collect_results(self, batch_id: str) -> list[dict]:
        """Wait for completion and collect all results."""
        batch = self._wait(batch_id)
        results = []

        for result in self.client.messages.batches.results(batch_id):
            entry = {"custom_id": result.custom_id}
            if result.result.type == "succeeded":
                msg = result.result.message
                entry["output"] = msg.content[0].text
                entry["usage"] = {
                    "input": msg.usage.input_tokens,
                    "output": msg.usage.output_tokens,
                }
            else:
                entry["error"] = result.result.type

            results.append(entry)

        # Save results
        with open(self.output_dir / f"{batch_id}_results.json", "w") as f:
            json.dump(results, f, indent=2)

        return results

    def _wait(self, batch_id: str):
        while True:
            batch = self.client.messages.batches.retrieve(batch_id)
            if batch.processing_status == "ended":
                return batch
            time.sleep(30)
```

### Usage Example

```python
pipeline = BatchPipeline(client)

# Prepare 5,000 classification requests
items = [{"id": f"doc-{i}", "text": doc} for i, doc in enumerate(documents)]

requests = pipeline.prepare_requests(
    items=items,
    system_prompt="Classify documents into categories. Return JSON with 'category' and 'confidence'.",
    user_template="Classify this document:\n\n{text}",
    model="claude-haiku-4-5-20250514",  # Use Haiku for simple classification
    max_tokens=256,
)

batch_id = pipeline.submit(requests)
results = pipeline.collect_results(batch_id)

# Analyze results
succeeded = [r for r in results if "output" in r]
failed = [r for r in results if "error" in r]
print(f"Success: {len(succeeded)}, Failed: {len(failed)}")
```

## Cost Comparison

Processing 10,000 documents with an average of 500 input tokens and 100 output tokens each:

| Method | Input Cost | Output Cost | Total | Time |
| --- | --- | --- | --- | --- |
| Standard API (Sonnet) | $15.00 | $15.00 | $30.00 | ~2 hours (rate limited) |
| Batch API (Sonnet) | $7.50 | $7.50 | $15.00 | 1-2 hours |
| Standard API (Haiku) | $5.00 | $5.00 | $10.00 | ~1 hour |
| Batch API (Haiku) | $2.50 | $2.50 | $5.00 | 1-2 hours |

The Batch API saves 50% on cost with comparable or better throughput for large workloads.

## Error Handling and Retries

Batches can have partial failures. Always handle errors per-request:

```python
def handle_batch_errors(batch_id: str) -> list[dict]:
    """Collect failed requests for retry."""
    failed = []
    for result in client.messages.batches.results(batch_id):
        if result.result.type == "errored":
            failed.append({
                "custom_id": result.custom_id,
                "error": str(result.result.error),
            })
        elif result.result.type == "expired":
            failed.append({
                "custom_id": result.custom_id,
                "error": "expired",
            })
    return failed

# Retry failed requests in a new batch
failed = handle_batch_errors(batch_id)
if failed:
    retry_requests = [
        original_requests[r["custom_id"]]
        for r in failed
        if r["custom_id"] in original_requests
    ]
    if retry_requests:
        retry_batch = client.messages.batches.create(requests=retry_requests)
```

## Canceling a Batch

If you need to stop a batch that is in progress:

```python
# Cancel a running batch
client.messages.batches.cancel(batch_id)

# Results for already-completed requests are still available
# Only pending requests are canceled
```

## Best Practices

1. **Use meaningful custom_ids** that map back to your data source for easy result matching
2. **Save batch IDs** immediately after submission -- you need them to retrieve results
3. **Monitor batch progress** with periodic polling, especially for time-sensitive workflows
4. **Implement idempotency** -- design your pipeline so resubmitting the same batch is safe
5. **Chunk large datasets** into multiple batches of 10,000 if needed
6. **Use the cheapest model** that meets your quality requirements -- Haiku with Batch API is extremely cost-effective for classification and extraction tasks

---

Source: https://callsphere.ai/blog/claude-api-batching-cost-effective
