Skip to content
Agentic AI
Agentic AI5 min read6 views

Claude API Batching: Processing Thousands of Requests Cost-Effectively

Master the Claude Message Batches API for high-volume, cost-effective processing. Learn how to submit batch jobs, poll for results, handle errors, and save 50% on Claude API costs for non-real-time workloads.

What Is the Message Batches API?

The Claude Message Batches API allows you to submit up to 10,000 requests in a single batch and receive results asynchronously. Each request in the batch gets a 50% discount on both input and output tokens compared to the standard Messages API.

The tradeoff: batches can take up to 24 hours to complete (though most finish within 1-2 hours). This makes the Batch API ideal for workloads that do not require real-time responses.

Ideal Use Cases

  • Document classification across thousands of files
  • Bulk content moderation
  • Dataset annotation and labeling
  • Nightly report generation
  • Mass email personalization
  • Code analysis across a large codebase
  • Evaluation and testing of prompts at scale

Submitting a Batch

from anthropic import Anthropic

client = Anthropic()

# Each request in the batch follows the standard Messages API format
requests = []
for i, document in enumerate(documents):
    requests.append({
        "custom_id": f"doc-{i}",  # Your identifier for tracking
        "params": {
            "model": "claude-sonnet-4-5-20250514",
            "max_tokens": 1024,
            "messages": [{
                "role": "user",
                "content": f"Classify this document into one of: [legal, financial, technical, marketing].\n\nDocument:\n{document}"
            }]
        }
    })

# Submit the batch
batch = client.messages.batches.create(requests=requests)

print(f"Batch ID: {batch.id}")
print(f"Status: {batch.processing_status}")
print(f"Total requests: {batch.request_counts.total}")

Polling for Results

import time

def wait_for_batch(batch_id: str, poll_interval: int = 30) -> dict:
    """Poll until batch completes."""
    while True:
        batch = client.messages.batches.retrieve(batch_id)

        print(f"Status: {batch.processing_status}")
        print(f"  Succeeded: {batch.request_counts.succeeded}")
        print(f"  Errored: {batch.request_counts.errored}")
        print(f"  Processing: {batch.request_counts.processing}")

        if batch.processing_status == "ended":
            return batch

        time.sleep(poll_interval)

batch_result = wait_for_batch(batch.id)

Retrieving Results

def get_batch_results(batch_id: str) -> dict[str, str]:
    """Retrieve all results from a completed batch."""
    results = {}

    for result in client.messages.batches.results(batch_id):
        custom_id = result.custom_id

        if result.result.type == "succeeded":
            message = result.result.message
            text = message.content[0].text
            results[custom_id] = {
                "status": "success",
                "text": text,
                "input_tokens": message.usage.input_tokens,
                "output_tokens": message.usage.output_tokens,
            }
        elif result.result.type == "errored":
            results[custom_id] = {
                "status": "error",
                "error": str(result.result.error),
            }
        elif result.result.type == "expired":
            results[custom_id] = {
                "status": "expired",
            }

    return results

results = get_batch_results(batch.id)
for custom_id, result in results.items():
    if result["status"] == "success":
        print(f"{custom_id}: {result['text'][:100]}...")

Production Batch Pipeline

Here is a complete pipeline for batch-processing a dataset:

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

flowchart TD
    START["Claude API Batching: Processing Thousands of Requ…"] --> A
    A["What Is the Message Batches API?"]
    A --> B
    B["Submitting a Batch"]
    B --> C
    C["Polling for Results"]
    C --> D
    D["Retrieving Results"]
    D --> E
    E["Production Batch Pipeline"]
    E --> F
    F["Cost Comparison"]
    F --> G
    G["Error Handling and Retries"]
    G --> H
    H["Canceling a Batch"]
    H --> DONE["Key Takeaways"]
    style START fill:#4f46e5,stroke:#4338ca,color:#fff
    style DONE fill:#059669,stroke:#047857,color:#fff
import json
import asyncio
from pathlib import Path
from datetime import datetime

class BatchPipeline:
    def __init__(self, client: Anthropic, output_dir: str = "./batch_results"):
        self.client = client
        self.output_dir = Path(output_dir)
        self.output_dir.mkdir(exist_ok=True)

    def prepare_requests(
        self,
        items: list[dict],
        system_prompt: str,
        user_template: str,
        model: str = "claude-sonnet-4-5-20250514",
        max_tokens: int = 1024,
    ) -> list[dict]:
        """Convert items into batch request format."""
        requests = []
        for item in items:
            user_content = user_template.format(**item)
            requests.append({
                "custom_id": str(item.get("id", len(requests))),
                "params": {
                    "model": model,
                    "max_tokens": max_tokens,
                    "system": system_prompt,
                    "messages": [{"role": "user", "content": user_content}],
                }
            })
        return requests

    def submit(self, requests: list[dict]) -> str:
        """Submit batch and return batch ID."""
        # Batch API supports up to 10,000 requests
        if len(requests) > 10_000:
            raise ValueError(f"Too many requests: {len(requests)} (max 10,000)")

        batch = self.client.messages.batches.create(requests=requests)

        # Save metadata
        metadata = {
            "batch_id": batch.id,
            "submitted_at": datetime.utcnow().isoformat(),
            "total_requests": len(requests),
        }
        with open(self.output_dir / f"{batch.id}_metadata.json", "w") as f:
            json.dump(metadata, f)

        return batch.id

    def collect_results(self, batch_id: str) -> list[dict]:
        """Wait for completion and collect all results."""
        batch = self._wait(batch_id)
        results = []

        for result in self.client.messages.batches.results(batch_id):
            entry = {"custom_id": result.custom_id}
            if result.result.type == "succeeded":
                msg = result.result.message
                entry["output"] = msg.content[0].text
                entry["usage"] = {
                    "input": msg.usage.input_tokens,
                    "output": msg.usage.output_tokens,
                }
            else:
                entry["error"] = result.result.type

            results.append(entry)

        # Save results
        with open(self.output_dir / f"{batch_id}_results.json", "w") as f:
            json.dump(results, f, indent=2)

        return results

    def _wait(self, batch_id: str):
        while True:
            batch = self.client.messages.batches.retrieve(batch_id)
            if batch.processing_status == "ended":
                return batch
            time.sleep(30)

Usage Example

pipeline = BatchPipeline(client)

# Prepare 5,000 classification requests
items = [{"id": f"doc-{i}", "text": doc} for i, doc in enumerate(documents)]

requests = pipeline.prepare_requests(
    items=items,
    system_prompt="Classify documents into categories. Return JSON with 'category' and 'confidence'.",
    user_template="Classify this document:\n\n{text}",
    model="claude-haiku-4-5-20250514",  # Use Haiku for simple classification
    max_tokens=256,
)

batch_id = pipeline.submit(requests)
results = pipeline.collect_results(batch_id)

# Analyze results
succeeded = [r for r in results if "output" in r]
failed = [r for r in results if "error" in r]
print(f"Success: {len(succeeded)}, Failed: {len(failed)}")

Cost Comparison

Processing 10,000 documents with an average of 500 input tokens and 100 output tokens each:

Method Input Cost Output Cost Total Time
Standard API (Sonnet) $15.00 $15.00 $30.00 ~2 hours (rate limited)
Batch API (Sonnet) $7.50 $7.50 $15.00 1-2 hours
Standard API (Haiku) $5.00 $5.00 $10.00 ~1 hour
Batch API (Haiku) $2.50 $2.50 $5.00 1-2 hours

The Batch API saves 50% on cost with comparable or better throughput for large workloads.

Error Handling and Retries

Batches can have partial failures. Always handle errors per-request:

flowchart TD
    CENTER(("Key Components"))
    CENTER --> N0["Document classification across thousand…"]
    CENTER --> N1["Bulk content moderation"]
    CENTER --> N2["Dataset annotation and labeling"]
    CENTER --> N3["Nightly report generation"]
    CENTER --> N4["Mass email personalization"]
    CENTER --> N5["Code analysis across a large codebase"]
    style CENTER fill:#4f46e5,stroke:#4338ca,color:#fff
def handle_batch_errors(batch_id: str) -> list[dict]:
    """Collect failed requests for retry."""
    failed = []
    for result in client.messages.batches.results(batch_id):
        if result.result.type == "errored":
            failed.append({
                "custom_id": result.custom_id,
                "error": str(result.result.error),
            })
        elif result.result.type == "expired":
            failed.append({
                "custom_id": result.custom_id,
                "error": "expired",
            })
    return failed

# Retry failed requests in a new batch
failed = handle_batch_errors(batch_id)
if failed:
    retry_requests = [
        original_requests[r["custom_id"]]
        for r in failed
        if r["custom_id"] in original_requests
    ]
    if retry_requests:
        retry_batch = client.messages.batches.create(requests=retry_requests)

Canceling a Batch

If you need to stop a batch that is in progress:

# Cancel a running batch
client.messages.batches.cancel(batch_id)

# Results for already-completed requests are still available
# Only pending requests are canceled

Best Practices

  1. Use meaningful custom_ids that map back to your data source for easy result matching
  2. Save batch IDs immediately after submission -- you need them to retrieve results
  3. Monitor batch progress with periodic polling, especially for time-sensitive workflows
  4. Implement idempotency -- design your pipeline so resubmitting the same batch is safe
  5. Chunk large datasets into multiple batches of 10,000 if needed
  6. Use the cheapest model that meets your quality requirements -- Haiku with Batch API is extremely cost-effective for classification and extraction tasks
Share
C

Written by

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.

Related Articles You May Like

AI Interview Prep

8 AI System Design Interview Questions Actually Asked at FAANG in 2026

Real AI system design interview questions from Google, Meta, OpenAI, and Anthropic. Covers LLM serving, RAG pipelines, recommendation systems, AI agents, and more — with detailed answer frameworks.

AI Interview Prep

8 LLM & RAG Interview Questions That OpenAI, Anthropic & Google Actually Ask

Real LLM and RAG interview questions from top AI labs in 2026. Covers fine-tuning vs RAG decisions, production RAG pipelines, evaluation, PEFT methods, positional embeddings, and safety guardrails with expert answers.

AI Interview Prep

7 AI Coding Interview Questions From Anthropic, Meta & OpenAI (2026 Edition)

Real AI coding interview questions from Anthropic, Meta, and OpenAI in 2026. Includes implementing attention from scratch, Anthropic's progressive coding screens, Meta's AI-assisted round, and vector search — with solution approaches.

Learn Agentic AI

AI Agent Cost Optimization: Reducing LLM API Spend by 70% with Caching and Routing

Practical cost reduction strategies for AI agents including semantic caching, intelligent model routing, prompt optimization, and batch processing to cut LLM API spend.

AI Interview Prep

7 Agentic AI & Multi-Agent System Interview Questions for 2026

Real agentic AI and multi-agent system interview questions from Anthropic, OpenAI, and Microsoft in 2026. Covers agent design patterns, memory systems, safety, orchestration frameworks, tool calling, and evaluation.

Learn Agentic AI

MCP Ecosystem Hits 5,000 Servers: Model Context Protocol Production Guide 2026

The MCP ecosystem has grown to 5,000+ servers. This production guide covers building MCP servers, enterprise adoption patterns, the 2026 roadmap, and integration best practices.