Skip to content
Claude Streaming: Real-Time Token Output for Responsive Agent Applications
Learn Agentic AI12 min read9 views

Claude Streaming: Real-Time Token Output for Responsive Agent Applications

Implement real-time streaming with Claude for responsive agent UIs. Learn how to handle stream events, process content_block_delta tokens, manage partial tool calls, and build streaming agent loops.

Why Streaming Matters for Agents

Without streaming, users see nothing until Claude finishes generating its entire response — which can take 10-30 seconds for complex agent tasks. Streaming delivers tokens as they are generated, giving users immediate feedback and making the application feel responsive even during long-running operations.

For agent systems specifically, streaming provides real-time visibility into what the agent is doing: you can show partial text as it forms, display tool call decisions as they happen, and even cancel a generation mid-stream if the agent is heading in the wrong direction.

Basic Streaming

Enable streaming by using the stream method instead of create:

sequenceDiagram
    autonumber
    participant Client
    participant Edge as Edge Worker
    participant LLM as LLM Provider
    participant DB as Logs and Trace
    Client->>Edge: POST /chat (stream=true)
    Edge->>LLM: messages.create(stream=true)
    loop Each token
        LLM-->>Edge: SSE chunk delta
        Edge-->>Client: SSE chunk delta
        Edge->>DB: append token to span
    end
    LLM-->>Edge: stop_reason=end_turn
    Edge-->>Client: event: done
    Edge->>DB: finalize trace
import anthropic

client = anthropic.Anthropic()

with client.messages.stream(
    model="claude-sonnet-4-20250514",
    max_tokens=1024,
    messages=[
        {"role": "user", "content": "Explain microservices architecture in detail."}
    ]
) as stream:
    for text in stream.text_stream:
        print(text, end="", flush=True)

print()  # Newline at the end

The stream.text_stream iterator yields text chunks as they arrive. The flush=True ensures each chunk is printed immediately rather than buffered. This is the simplest way to get streaming working.

Event-Based Streaming

For more control, process individual stream events:

Hear it before you finish reading

Talk to a live CallSphere AI voice agent in your browser — 60 seconds, no signup.

Try Live Demo →
import anthropic

client = anthropic.Anthropic()

with client.messages.stream(
    model="claude-sonnet-4-20250514",
    max_tokens=1024,
    messages=[
        {"role": "user", "content": "Write a Python quicksort implementation."}
    ]
) as stream:
    for event in stream:
        if event.type == "content_block_start":
            print(f"[Block started: {event.content_block.type}]")
        elif event.type == "content_block_delta":
            if event.delta.type == "text_delta":
                print(event.delta.text, end="", flush=True)
        elif event.type == "content_block_stop":
            print(f"\n[Block ended]")
        elif event.type == "message_stop":
            print("\n[Message complete]")

# Access the final message after streaming completes
final_message = stream.get_final_message()
print(f"Total tokens: {final_message.usage.input_tokens + final_message.usage.output_tokens}")

Event-based streaming gives you lifecycle hooks for when content blocks start, receive deltas, and complete. This is essential for building UIs that show different states (thinking, writing, calling tools).

Streaming with Tool Use

When Claude calls tools during streaming, you receive tool-related events:

import anthropic
import json

client = anthropic.Anthropic()

tools = [
    {
        "name": "calculate",
        "description": "Perform a mathematical calculation.",
        "input_schema": {
            "type": "object",
            "properties": {
                "expression": {"type": "string", "description": "Math expression to evaluate"}
            },
            "required": ["expression"]
        }
    }
]

def execute_tool(name: str, args: dict) -> str:
    if name == "calculate":
        try:
            result = eval(args["expression"])
            return json.dumps({"result": result})
        except Exception as e:
            return json.dumps({"error": str(e)})
    return json.dumps({"error": "Unknown tool"})

messages = [{"role": "user", "content": "What is 247 * 389 + 1024?"}]

# First streaming call - may result in tool use
with client.messages.stream(
    model="claude-sonnet-4-20250514",
    max_tokens=1024,
    tools=tools,
    messages=messages
) as stream:
    for event in stream:
        if event.type == "content_block_delta":
            if event.delta.type == "text_delta":
                print(event.delta.text, end="", flush=True)
            elif event.delta.type == "input_json_delta":
                print(f"[Tool input: {event.delta.partial_json}]", end="")

    response = stream.get_final_message()

# Handle tool use if needed
if response.stop_reason == "tool_use":
    messages.append({"role": "assistant", "content": response.content})
    tool_results = []
    for block in response.content:
        if block.type == "tool_use":
            result = execute_tool(block.name, block.input)
            tool_results.append({
                "type": "tool_result",
                "tool_use_id": block.id,
                "content": result
            })
    messages.append({"role": "user", "content": tool_results})

    # Stream the final response
    with client.messages.stream(
        model="claude-sonnet-4-20250514",
        max_tokens=1024,
        tools=tools,
        messages=messages
    ) as stream:
        for text in stream.text_stream:
            print(text, end="", flush=True)

The input_json_delta events let you see tool arguments as they form, which is useful for showing progress indicators like "Calling calculate with 247 * 389 + 1024..."

Async Streaming

For web applications, use async streaming:

import anthropic
import asyncio

async def stream_response(user_input: str):
    client = anthropic.AsyncAnthropic()

    async with client.messages.stream(
        model="claude-sonnet-4-20250514",
        max_tokens=1024,
        messages=[{"role": "user", "content": user_input}]
    ) as stream:
        async for text in stream.text_stream:
            yield text

async def main():
    async for chunk in stream_response("Explain event-driven architecture"):
        print(chunk, end="", flush=True)
    print()

asyncio.run(main())

The async streaming generator pattern integrates directly with web frameworks. In FastAPI, you can return it as a StreamingResponse for server-sent events (SSE):

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import anthropic

app = FastAPI()

@app.get("/chat")
async def chat(q: str):
    client = anthropic.AsyncAnthropic()

    async def generate():
        async with client.messages.stream(
            model="claude-sonnet-4-20250514",
            max_tokens=2048,
            messages=[{"role": "user", "content": q}]
        ) as stream:
            async for text in stream.text_stream:
                yield f"data: {text}\n\n"
        yield "data: [DONE]\n\n"

    return StreamingResponse(generate(), media_type="text/event-stream")

This gives you a production-ready SSE endpoint that streams Claude's response directly to a frontend client.

Still reading? Stop comparing — try CallSphere live.

CallSphere ships complete AI voice agents per industry — 14 tools for healthcare, 10 agents for real estate, 4 specialists for salons. See how it actually handles a call before you book a demo.

Streaming with Extended Thinking

When streaming with extended thinking enabled, you receive thinking deltas before text deltas:

import anthropic

client = anthropic.Anthropic()

with client.messages.stream(
    model="claude-sonnet-4-20250514",
    max_tokens=16000,
    thinking={"type": "enabled", "budget_tokens": 8000},
    messages=[{"role": "user", "content": "Design a caching strategy for a high-traffic API."}]
) as stream:
    current_block = None
    for event in stream:
        if event.type == "content_block_start":
            current_block = event.content_block.type
            if current_block == "thinking":
                print("[Thinking...]", flush=True)
            elif current_block == "text":
                print("\n[Response:]", flush=True)
        elif event.type == "content_block_delta":
            if event.delta.type == "thinking_delta":
                pass  # Optionally show thinking progress
            elif event.delta.type == "text_delta":
                print(event.delta.text, end="", flush=True)

This lets you show a "thinking" indicator while Claude reasons, then stream the actual response text once it starts, creating a polished user experience.

FAQ

Does streaming change the output quality or content?

No. Streaming produces identical output to non-streaming calls. The only difference is delivery timing — tokens arrive incrementally instead of all at once. The final assembled message is exactly the same.

Can I cancel a stream mid-generation?

Yes. Simply break out of the stream iterator or close the stream context manager. The API stops generating tokens when the connection closes. This is useful for implementing "stop generating" buttons or for agents that detect they are going off track.

Does streaming cost more than non-streaming?

No. Token pricing is identical. You pay the same per-token rate regardless of whether you use streaming. The only overhead is a slightly higher number of HTTP frames, which has negligible impact on network costs.


#Anthropic #Claude #Streaming #RealTime #AgentUX #AgenticAI #LearnAI #AIEngineering

Share

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.