Skip to content
Agentic AI
Agentic AI8 min read0 views

Build a Claude batch processing pipeline step by step

Hands-on Claude Message Batches walkthrough: shape requests, submit, poll to ended, reconcile counts, and reassemble results by custom_id in Python.

Documentation tells you the Message Batches API exists. It does not tell you what the third hour of running a real 80,000-request job feels like — the moment you realize your result file has 79,400 lines and you have no idea which 600 rows vanished. This walkthrough is the job I wish someone had handed me: every step from a raw list of inputs to a clean, reconciled output, written so you can lift it into your own pipeline today. We will classify a backlog of customer messages, but the shape applies to any large batch task.

Key takeaways

  • Encode a stable join key (a database ID, not an array index) into every custom_id so you can reassemble results no matter what order they come back in.
  • Keep max_tokens tight for the task — a classifier needs 10 tokens, not 1,024 — to control cost and avoid truncation surprises.
  • Poll on a 30–60 second loop until processing_status == "ended"; aggressive polling just spends rate limit.
  • Always reconcile request_counts against the number you submitted, and branch error handling on invalid_request (fix and resubmit) versus api_error (retry as-is).
  • Stream results lazily and join on custom_id; never zip positionally against your input list.

Step 1: shape the input into a join-friendly request array

The single most consequential decision happens before any API call: how you name each request. Results come back unordered, so the custom_id is the only thread connecting an output to its source row. Make it carry a key you actually have on the other side — a ticket ID, a row primary key, a content hash. An opaque counter that you cannot map back later turns a finished batch into a useless pile of text.

import anthropic
from anthropic.types.message_create_params import MessageCreateParamsNonStreaming
from anthropic.types.messages.batch_create_params import Request

client = anthropic.Anthropic()

# tickets: list of (ticket_id, body) pulled from your DB
requests = [
    Request(
        custom_id=f"ticket-{ticket_id}",   # the join key lives here
        params=MessageCreateParamsNonStreaming(
            model="claude-haiku-4-5",
            max_tokens=12,                  # one word, kept tight
            system="Classify the message as one of: billing, bug, "
                   "feature_request, praise, other. Reply with only the label.",
            messages=[{"role": "user", "content": body}],
        ),
    )
    for ticket_id, body in tickets
]

Note the model choice. Classification is a Haiku task — fast, cheap, and entirely sufficient for picking one of five labels. Reserve Opus for the requests in your batch that genuinely need reasoning. Because each request carries its own model, you can mix them in a single batch without ceremony.

Step 2: submit and capture the batch ID durably

The Message Batches API is an asynchronous endpoint: you submit an array of independent requests, receive a batch ID immediately, and retrieve results later. The submit call is cheap and synchronous, but the ID it returns is precious — it is the only handle to your in-flight work. Write it somewhere durable before you do anything else.

batch = client.messages.batches.create(requests=requests)
print(batch.id, batch.processing_status)   # msgbatch_..., in_progress

# Persist immediately — if your process dies, you can still recover.
with open("batch_id.txt", "w") as f:
    f.write(batch.id)

If your process crashes between submit and result retrieval, the batch keeps running on Anthropic's side regardless. As long as you saved the ID, you reattach with a fresh retrieve() call. Lose the ID and you have orphaned a job you are still paying for.

Hear it before you finish reading

Talk to a live CallSphere AI voice agent in your browser — 60 seconds, no signup.

Try Live Demo →

There is a subtler reason to persist eagerly. The 256 MB and 100,000-request ceilings mean a very large corpus may not fit in one batch — you will be submitting several, and each comes back with its own ID. A simple manifest that records each batch ID alongside the slice of input it covers turns a multi-batch job into a tractable bookkeeping exercise instead of a memory-of-the-running-process gamble. Append the ID and its input range to that manifest the moment create() returns, before you start polling.

Step 3: poll the lifecycle to a terminal state

The batch moves through a small, well-defined lifecycle. The diagram below is the state machine your polling loop is observing.

flowchart TD
  A["batches.create()"] --> B["in_progress"]
  B --> C{"poll: retrieve(id)"}
  C -->|status != ended| D["sleep 30-60s"]
  D --> C
  C -->|status == ended| E["reconcile request_counts"]
  E --> F{"errored > 0?"}
  F -->|yes| G["split: invalid_request vs api_error"]
  F -->|no| H["stream results()"]
  G --> H
  H --> I["join each result on custom_id"]
  I --> J["write to durable store"]

The loop itself is deliberately boring. Boring is correct here — there is no event stream to subscribe to, just a status you check on an interval until it reads ended.

import time

while True:
    batch = client.messages.batches.retrieve(batch.id)
    if batch.processing_status == "ended":
        break
    counts = batch.request_counts
    print(f"processing={counts.processing} succeeded={counts.succeeded}")
    time.sleep(45)

Step 4: reconcile counts before you trust a single result

This is the step most tutorials skip and most production incidents trace back to. When the batch ends, the request_counts object tells you exactly how the work landed. Sum the terminal buckets and compare to what you submitted. If they do not match, stop and investigate before any downstream system consumes the output.

c = batch.request_counts
total = c.succeeded + c.errored + c.canceled + c.expired
assert total == len(requests), f"count mismatch: {total} vs {len(requests)}"
if c.errored:
    print(f"WARNING: {c.errored} requests errored - inspect before proceeding")
if c.expired:
    print(f"WARNING: {c.expired} requests expired - resubmit these")

Step 5: stream results and branch on error type

Now pull the results. The stream yields one entry per request; each has a result.type you must switch on. Two error paths matter and they demand opposite responses: invalid_request means your payload was wrong, so fix it and resubmit; any other error is server-side and is safe to retry unchanged.

labels = {}
to_fix, to_retry = [], []

for r in client.messages.batches.results(batch.id):
    cid = r.custom_id
    if r.result.type == "succeeded":
        msg = r.result.message
        labels[cid] = next((b.text for b in msg.content if b.type == "text"), "")
    elif r.result.type == "errored":
        if r.result.error.type == "invalid_request":
            to_fix.append(cid)      # malformed - repair the payload
        else:
            to_retry.append(cid)    # transient - resubmit as-is
    elif r.result.type == "expired":
        to_retry.append(cid)        # ran out of time - resubmit

Step 6: reassemble against your source data

With a labels dict keyed by custom_id, the join back to your database is a clean lookup. Because the key encodes the ticket ID, there is no guesswork and no reliance on ordering. This is the payoff for the discipline in step 1.

Still reading? Stop comparing — try CallSphere live.

CallSphere ships complete AI voice agents per industry — 14 tools for healthcare, 10 agents for real estate, 4 specialists for salons. See how it actually handles a call before you book a demo.

Two operational habits make this final step robust. First, write the labels to your own durable store immediately — the batch result store keeps results for only 29 days, after which they vanish, so treat the batch as transient compute and pull everything out promptly. Second, do the database writes idempotently: if you re-run the reassembly after a partial failure, an unconditional update keyed on the ticket ID is naturally safe to repeat, whereas an insert would duplicate rows. Designing the write so a replay is a no-op means a hiccup in reassembly never corrupts your downstream data.

for ticket_id, _ in tickets:
    label = labels.get(f"ticket-{ticket_id}")
    if label is None:
        continue              # landed in to_fix or to_retry
    db.update_ticket(ticket_id, category=label.strip())

Common pitfalls

  • Array-index custom_ids. If custom_id is just str(i) and you later reorder or filter the input list, the join is broken. Encode a stable key.
  • Forgetting to persist the batch ID. The job survives a crash; your in-memory ID does not. Write it to disk on submit.
  • Skipping reconciliation. Consuming the succeeded results without checking request_counts means silently dropping every errored and expired row.
  • Loading the whole results stream into memory. For large batches, iterate the generator and write as you go rather than building one giant list.
  • Oversized max_tokens on cheap tasks. A one-word classifier with max_tokens=1024 wastes nothing on output but invites the model to ramble if your prompt is loose. Keep it tight.

Local test vs. full production run

StageSample sizeWhat you are verifying
Smoke test5–10 requestsPrompt shape, label vocabulary, custom_id join
Dry run~500 requestsError rate, cost-per-item, reconciliation logic
ProductionFull backlogThroughput, expired-rate, end-to-end reassembly

Frequently asked questions

How often should I poll a running batch?

Every 30 to 60 seconds. The batch completes on its own schedule regardless of polling frequency, so tighter loops only consume your request rate limit without speeding anything up.

My process died mid-job. Did I lose the batch?

No, provided you saved the batch ID. The work runs on Anthropic's infrastructure independently of your client. Reattach with batches.retrieve(saved_id) and continue from polling.

What is the difference between an errored and an expired result?

An errored result failed during processing — either a malformed payload (invalid_request, which you must fix) or a server-side issue (retry as-is). An expired result simply ran out of the 24-hour window before a worker reached it; resubmit it unchanged.

Can I cancel a batch I submitted by mistake?

Yes. Call batches.cancel(id). Requests still in flight move toward a canceled state; any that already succeeded keep their results.

Bringing agentic AI to your phone lines

The same submit-poll-reconcile discipline that makes a batch job reliable also underpins reliable real-time agents. CallSphere applies these Claude patterns to voice and chat — assistants that answer every call and message, call tools mid-conversation, and book work 24/7. See it live at callsphere.ai.


Source & attribution: This is an independent, original explainer inspired by Anthropic's coverage on the Claude blog. Claude, Claude Code, Claude Cowork, Claude Opus, and the Model Context Protocol are products and trademarks of Anthropic. CallSphere is not affiliated with or endorsed by Anthropic.

Share

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.