Build a Claude batch processing pipeline step by step
Hands-on Claude Message Batches walkthrough: shape requests, submit, poll to ended, reconcile counts, and reassemble results by custom_id in Python.
Documentation tells you the Message Batches API exists. It does not tell you what the third hour of running a real 80,000-request job feels like — the moment you realize your result file has 79,400 lines and you have no idea which 600 rows vanished. This walkthrough is the job I wish someone had handed me: every step from a raw list of inputs to a clean, reconciled output, written so you can lift it into your own pipeline today. We will classify a backlog of customer messages, but the shape applies to any large batch task.
Key takeaways
- Encode a stable join key (a database ID, not an array index) into every
custom_idso you can reassemble results no matter what order they come back in. - Keep
max_tokenstight for the task — a classifier needs 10 tokens, not 1,024 — to control cost and avoid truncation surprises. - Poll on a 30–60 second loop until
processing_status == "ended"; aggressive polling just spends rate limit. - Always reconcile
request_countsagainst the number you submitted, and branch error handling oninvalid_request(fix and resubmit) versusapi_error(retry as-is). - Stream results lazily and join on
custom_id; never zip positionally against your input list.
Step 1: shape the input into a join-friendly request array
The single most consequential decision happens before any API call: how you name each request. Results come back unordered, so the custom_id is the only thread connecting an output to its source row. Make it carry a key you actually have on the other side — a ticket ID, a row primary key, a content hash. An opaque counter that you cannot map back later turns a finished batch into a useless pile of text.
import anthropic
from anthropic.types.message_create_params import MessageCreateParamsNonStreaming
from anthropic.types.messages.batch_create_params import Request
client = anthropic.Anthropic()
# tickets: list of (ticket_id, body) pulled from your DB
requests = [
Request(
custom_id=f"ticket-{ticket_id}", # the join key lives here
params=MessageCreateParamsNonStreaming(
model="claude-haiku-4-5",
max_tokens=12, # one word, kept tight
system="Classify the message as one of: billing, bug, "
"feature_request, praise, other. Reply with only the label.",
messages=[{"role": "user", "content": body}],
),
)
for ticket_id, body in tickets
]Note the model choice. Classification is a Haiku task — fast, cheap, and entirely sufficient for picking one of five labels. Reserve Opus for the requests in your batch that genuinely need reasoning. Because each request carries its own model, you can mix them in a single batch without ceremony.
Step 2: submit and capture the batch ID durably
The Message Batches API is an asynchronous endpoint: you submit an array of independent requests, receive a batch ID immediately, and retrieve results later. The submit call is cheap and synchronous, but the ID it returns is precious — it is the only handle to your in-flight work. Write it somewhere durable before you do anything else.
batch = client.messages.batches.create(requests=requests)
print(batch.id, batch.processing_status) # msgbatch_..., in_progress
# Persist immediately — if your process dies, you can still recover.
with open("batch_id.txt", "w") as f:
f.write(batch.id)If your process crashes between submit and result retrieval, the batch keeps running on Anthropic's side regardless. As long as you saved the ID, you reattach with a fresh retrieve() call. Lose the ID and you have orphaned a job you are still paying for.
Hear it before you finish reading
Talk to a live CallSphere AI voice agent in your browser — 60 seconds, no signup.
There is a subtler reason to persist eagerly. The 256 MB and 100,000-request ceilings mean a very large corpus may not fit in one batch — you will be submitting several, and each comes back with its own ID. A simple manifest that records each batch ID alongside the slice of input it covers turns a multi-batch job into a tractable bookkeeping exercise instead of a memory-of-the-running-process gamble. Append the ID and its input range to that manifest the moment create() returns, before you start polling.
Step 3: poll the lifecycle to a terminal state
The batch moves through a small, well-defined lifecycle. The diagram below is the state machine your polling loop is observing.
flowchart TD
A["batches.create()"] --> B["in_progress"]
B --> C{"poll: retrieve(id)"}
C -->|status != ended| D["sleep 30-60s"]
D --> C
C -->|status == ended| E["reconcile request_counts"]
E --> F{"errored > 0?"}
F -->|yes| G["split: invalid_request vs api_error"]
F -->|no| H["stream results()"]
G --> H
H --> I["join each result on custom_id"]
I --> J["write to durable store"]The loop itself is deliberately boring. Boring is correct here — there is no event stream to subscribe to, just a status you check on an interval until it reads ended.
import time
while True:
batch = client.messages.batches.retrieve(batch.id)
if batch.processing_status == "ended":
break
counts = batch.request_counts
print(f"processing={counts.processing} succeeded={counts.succeeded}")
time.sleep(45)Step 4: reconcile counts before you trust a single result
This is the step most tutorials skip and most production incidents trace back to. When the batch ends, the request_counts object tells you exactly how the work landed. Sum the terminal buckets and compare to what you submitted. If they do not match, stop and investigate before any downstream system consumes the output.
c = batch.request_counts
total = c.succeeded + c.errored + c.canceled + c.expired
assert total == len(requests), f"count mismatch: {total} vs {len(requests)}"
if c.errored:
print(f"WARNING: {c.errored} requests errored - inspect before proceeding")
if c.expired:
print(f"WARNING: {c.expired} requests expired - resubmit these")Step 5: stream results and branch on error type
Now pull the results. The stream yields one entry per request; each has a result.type you must switch on. Two error paths matter and they demand opposite responses: invalid_request means your payload was wrong, so fix it and resubmit; any other error is server-side and is safe to retry unchanged.
labels = {}
to_fix, to_retry = [], []
for r in client.messages.batches.results(batch.id):
cid = r.custom_id
if r.result.type == "succeeded":
msg = r.result.message
labels[cid] = next((b.text for b in msg.content if b.type == "text"), "")
elif r.result.type == "errored":
if r.result.error.type == "invalid_request":
to_fix.append(cid) # malformed - repair the payload
else:
to_retry.append(cid) # transient - resubmit as-is
elif r.result.type == "expired":
to_retry.append(cid) # ran out of time - resubmitStep 6: reassemble against your source data
With a labels dict keyed by custom_id, the join back to your database is a clean lookup. Because the key encodes the ticket ID, there is no guesswork and no reliance on ordering. This is the payoff for the discipline in step 1.
Still reading? Stop comparing — try CallSphere live.
CallSphere ships complete AI voice agents per industry — 14 tools for healthcare, 10 agents for real estate, 4 specialists for salons. See how it actually handles a call before you book a demo.
Two operational habits make this final step robust. First, write the labels to your own durable store immediately — the batch result store keeps results for only 29 days, after which they vanish, so treat the batch as transient compute and pull everything out promptly. Second, do the database writes idempotently: if you re-run the reassembly after a partial failure, an unconditional update keyed on the ticket ID is naturally safe to repeat, whereas an insert would duplicate rows. Designing the write so a replay is a no-op means a hiccup in reassembly never corrupts your downstream data.
for ticket_id, _ in tickets:
label = labels.get(f"ticket-{ticket_id}")
if label is None:
continue # landed in to_fix or to_retry
db.update_ticket(ticket_id, category=label.strip())Common pitfalls
- Array-index custom_ids. If
custom_idis juststr(i)and you later reorder or filter the input list, the join is broken. Encode a stable key. - Forgetting to persist the batch ID. The job survives a crash; your in-memory ID does not. Write it to disk on submit.
- Skipping reconciliation. Consuming the succeeded results without checking
request_countsmeans silently dropping every errored and expired row. - Loading the whole results stream into memory. For large batches, iterate the generator and write as you go rather than building one giant list.
- Oversized max_tokens on cheap tasks. A one-word classifier with
max_tokens=1024wastes nothing on output but invites the model to ramble if your prompt is loose. Keep it tight.
Local test vs. full production run
| Stage | Sample size | What you are verifying |
|---|---|---|
| Smoke test | 5–10 requests | Prompt shape, label vocabulary, custom_id join |
| Dry run | ~500 requests | Error rate, cost-per-item, reconciliation logic |
| Production | Full backlog | Throughput, expired-rate, end-to-end reassembly |
Frequently asked questions
How often should I poll a running batch?
Every 30 to 60 seconds. The batch completes on its own schedule regardless of polling frequency, so tighter loops only consume your request rate limit without speeding anything up.
My process died mid-job. Did I lose the batch?
No, provided you saved the batch ID. The work runs on Anthropic's infrastructure independently of your client. Reattach with batches.retrieve(saved_id) and continue from polling.
What is the difference between an errored and an expired result?
An errored result failed during processing — either a malformed payload (invalid_request, which you must fix) or a server-side issue (retry as-is). An expired result simply ran out of the 24-hour window before a worker reached it; resubmit it unchanged.
Can I cancel a batch I submitted by mistake?
Yes. Call batches.cancel(id). Requests still in flight move toward a canceled state; any that already succeeded keep their results.
Bringing agentic AI to your phone lines
The same submit-poll-reconcile discipline that makes a batch job reliable also underpins reliable real-time agents. CallSphere applies these Claude patterns to voice and chat — assistants that answer every call and message, call tools mid-conversation, and book work 24/7. See it live at callsphere.ai.
Source & attribution: This is an independent, original explainer inspired by Anthropic's coverage on the Claude blog. Claude, Claude Code, Claude Cowork, Claude Opus, and the Model Context Protocol are products and trademarks of Anthropic. CallSphere is not affiliated with or endorsed by Anthropic.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.