---
title: "Claude API Streaming: Real-Time AI Responses in Production"
description: "Complete guide to implementing streaming responses with the Claude API. Covers SSE implementation, token-by-token rendering, error handling during streams, and production patterns for real-time AI applications."
canonical: https://callsphere.ai/blog/claude-api-streaming-production
category: "Agentic AI"
tags: ["Claude API", "Streaming", "SSE", "Real-Time AI", "Production", "Anthropic"]
author: "CallSphere Team"
published: 2026-01-27T00:00:00.000Z
updated: 2026-05-07T11:27:18.835Z
---

# Claude API Streaming: Real-Time AI Responses in Production

> Complete guide to implementing streaming responses with the Claude API. Covers SSE implementation, token-by-token rendering, error handling during streams, and production patterns for real-time AI applications.

## Why Streaming Matters

Without streaming, a Claude API call blocks until the entire response is generated. For a 1,000-token response, that means 5-15 seconds of silence followed by a wall of text. Users perceive this as slow, unresponsive, and frustrating.

Streaming changes the UX fundamentally. The first token arrives within 500ms-2s (time to first token, or TTFT), and subsequent tokens stream in at 50-100 tokens per second. Users see the response forming in real time, which feels fast even when the total generation time is identical.

For production applications -- chatbots, code assistants, real-time analysis tools -- streaming is not optional. It is a core UX requirement.

## Basic Streaming in Python

```python
from anthropic import Anthropic

client = Anthropic()

# Basic streaming with the messages API
with client.messages.stream(
    model="claude-sonnet-4-5-20250514",
    max_tokens=4096,
    messages=[{"role": "user", "content": "Explain how TCP/IP works."}]
) as stream:
    for text in stream.text_stream:
        print(text, end="", flush=True)
```

The `stream()` method returns a context manager that yields text chunks as they arrive. The `flush=True` ensures each chunk is printed immediately rather than buffered.

```mermaid
flowchart LR
    USER(["User message"])
    LOOP{"messages.create
agent loop"}
    THINK["Extended thinking
optional"]
    TOOL{"stop_reason
tool_use?"}
    EXEC["Execute tool
append tool_result"]
    DONE(["stop_reason
end_turn"])
    USER --> LOOP --> THINK --> TOOL
    TOOL -->|Yes| EXEC --> LOOP
    TOOL -->|No| DONE
    style LOOP fill:#4f46e5,stroke:#4338ca,color:#fff
    style THINK fill:#ede9fe,stroke:#7c3aed,color:#1e1b4b
    style DONE fill:#059669,stroke:#047857,color:#fff
```

## Basic Streaming in TypeScript

```typescript
import Anthropic from "@anthropic-ai/sdk";

const client = new Anthropic();

const stream = await client.messages.stream({
  model: "claude-sonnet-4-5-20250514",
  max_tokens: 4096,
  messages: [{ role: "user", content: "Explain how TCP/IP works." }],
});

for await (const event of stream) {
  if (event.type === "content_block_delta" && event.delta.type === "text_delta") {
    process.stdout.write(event.delta.text);
  }
}

// Get the final message with usage stats
const finalMessage = await stream.finalMessage();
console.log("\nTokens used:", finalMessage.usage);
```

## Server-Sent Events (SSE) Architecture

The Claude API uses Server-Sent Events for streaming. Each event has a type that tells you what is happening:

| Event Type | Description | When It Occurs |
| --- | --- | --- |
| `message_start` | Message metadata, model info | First event |
| `content_block_start` | New content block begins | Before each text/tool block |
| `content_block_delta` | Incremental content update | During generation |
| `content_block_stop` | Content block complete | After each block |
| `message_delta` | Message-level updates (stop reason, usage) | Near end |
| `message_stop` | Stream complete | Last event |

### Handling All Event Types

```python
from anthropic import Anthropic

client = Anthropic()

with client.messages.stream(
    model="claude-sonnet-4-5-20250514",
    max_tokens=4096,
    messages=[{"role": "user", "content": "Write a Python function to sort a list."}]
) as stream:
    for event in stream:
        match event.type:
            case "message_start":
                print(f"Model: {event.message.model}")
            case "content_block_start":
                if event.content_block.type == "text":
                    print("--- Text block started ---")
                elif event.content_block.type == "tool_use":
                    print(f"--- Tool call: {event.content_block.name} ---")
            case "content_block_delta":
                if event.delta.type == "text_delta":
                    print(event.delta.text, end="", flush=True)
                elif event.delta.type == "input_json_delta":
                    print(event.delta.partial_json, end="", flush=True)
            case "message_delta":
                print(f"\nStop reason: {event.delta.stop_reason}")
                print(f"Output tokens: {event.usage.output_tokens}")
            case "message_stop":
                print("\n--- Stream complete ---")
```

## Streaming with Tool Use

Streaming becomes more complex when tools are involved. Claude may stream text, then switch to a tool call, then resume text after seeing the tool result.

```python
import json

def stream_with_tools(user_message: str, tools: list):
    messages = [{"role": "user", "content": user_message}]

    while True:
        collected_text = ""
        tool_calls = []
        current_tool_input = ""

        with client.messages.stream(
            model="claude-sonnet-4-5-20250514",
            max_tokens=4096,
            tools=tools,
            messages=messages,
        ) as stream:
            for event in stream:
                if event.type == "content_block_delta":
                    if event.delta.type == "text_delta":
                        print(event.delta.text, end="", flush=True)
                        collected_text += event.delta.text
                    elif event.delta.type == "input_json_delta":
                        current_tool_input += event.delta.partial_json

                elif event.type == "content_block_start":
                    if event.content_block.type == "tool_use":
                        current_tool_input = ""
                        tool_calls.append({
                            "id": event.content_block.id,
                            "name": event.content_block.name,
                        })

                elif event.type == "content_block_stop":
                    if tool_calls and current_tool_input:
                        tool_calls[-1]["input"] = json.loads(current_tool_input)
                        current_tool_input = ""

            final = stream.get_final_message()

        # If no tool calls, we are done
        if final.stop_reason != "tool_use":
            return collected_text

        # Execute tools and continue
        messages.append({"role": "assistant", "content": final.content})
        tool_results = []
        for tc in tool_calls:
            result = execute_tool(tc["name"], tc["input"])
            tool_results.append({
                "type": "tool_result",
                "tool_use_id": tc["id"],
                "content": json.dumps(result),
            })
        messages.append({"role": "user", "content": tool_results})
```

## Building a Streaming API Endpoint

For web applications, you need to proxy the Claude stream to your frontend. Here is a FastAPI implementation:

```python
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from anthropic import Anthropic

app = FastAPI()
client = Anthropic()

@app.post("/api/chat")
async def chat_endpoint(request: ChatRequest):
    async def generate():
        with client.messages.stream(
            model="claude-sonnet-4-5-20250514",
            max_tokens=4096,
            system=request.system_prompt,
            messages=request.messages,
        ) as stream:
            for text in stream.text_stream:
                # Format as SSE
                yield f"data: {json.dumps({'text': text})}\n\n"
        yield "data: [DONE]\n\n"

    return StreamingResponse(
        generate(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no",  # Disable nginx buffering
        }
    )
```

### Frontend Consumer (React)

```typescript
async function streamChat(messages: Message[]): AsyncGenerator {
  const response = await fetch("/api/chat", {
    method: "POST",
    headers: { "Content-Type": "application/json" },
    body: JSON.stringify({ messages }),
  });

  const reader = response.body!.getReader();
  const decoder = new TextDecoder();

  while (true) {
    const { done, value } = await reader.read();
    if (done) break;

    const chunk = decoder.decode(value);
    const lines = chunk.split("\n\n");

    for (const line of lines) {
      if (line.startsWith("data: ") && line !== "data: [DONE]") {
        const data = JSON.parse(line.slice(6));
        yield data.text;
      }
    }
  }
}

// Usage in a React component
function ChatComponent() {
  const [response, setResponse] = useState("");

  const handleSend = async (message: string) => {
    setResponse("");
    for await (const chunk of streamChat([{ role: "user", content: message }])) {
      setResponse(prev => prev + chunk);
    }
  };

  return {response};
}
```

## Error Handling During Streams

Streams can fail mid-generation due to network issues, rate limits, or server errors. Robust error handling is essential.

```python
from anthropic import APIConnectionError, RateLimitError, APIStatusError
import time

def stream_with_retry(messages: list, max_retries: int = 3):
    for attempt in range(max_retries):
        try:
            collected = ""
            with client.messages.stream(
                model="claude-sonnet-4-5-20250514",
                max_tokens=4096,
                messages=messages,
            ) as stream:
                for text in stream.text_stream:
                    collected += text
                    yield text
            return  # Success

        except APIConnectionError:
            if attempt = 500 and attempt = flush_interval or len(buffer) > 100:
                yield buffer
                buffer = ""
                last_flush = now

        if buffer:  # Flush remaining
            yield buffer
```

### Connection Keep-Alive

For high-throughput applications, reuse HTTP connections. The Anthropic Python SDK handles this automatically through its internal `httpx` client. In TypeScript, the SDK uses `node-fetch` with connection pooling enabled by default.

## Monitoring Streaming Performance

Track these metrics in production:

- **Time to first token (TTFT)**: Should be under 2 seconds for interactive applications
- **Tokens per second**: Typically 50-100 for Claude Sonnet
- **Stream completion rate**: Percentage of streams that complete without error
- **Partial response recovery**: How often you successfully retry after mid-stream failures

---

Source: https://callsphere.ai/blog/claude-api-streaming-production
