---
title: "Streaming Agent Architectures: Real-Time Token-by-Token Output with Tool Call Interleaving"
description: "Master the architecture of streaming AI agents that deliver token-by-token output while interleaving tool calls, using Server-Sent Events and progressive rendering to create responsive user experiences."
canonical: https://callsphere.ai/blog/streaming-agent-architectures-real-time-token-tool-interleaving
category: "Learn Agentic AI"
tags: ["Streaming", "SSE", "Real-Time", "Tool Interleaving", "Agent Architecture"]
author: "CallSphere Team"
published: 2026-03-18T00:00:00.000Z
updated: 2026-05-06T01:02:45.989Z
---

# Streaming Agent Architectures: Real-Time Token-by-Token Output with Tool Call Interleaving

> Master the architecture of streaming AI agents that deliver token-by-token output while interleaving tool calls, using Server-Sent Events and progressive rendering to create responsive user experiences.

## Why Streaming Matters for Agents

A non-streaming agent calls the LLM, waits for the full response, executes any tool calls, waits again, and finally returns everything at once. The user stares at a spinner for 5-15 seconds. Streaming agents send tokens to the client the moment they are generated. When the model decides to call a tool, the client sees a tool execution indicator, and when the tool returns, the model's continuation streams immediately.

The result is an agent that feels instantaneous. The first token appears in 200-400ms. Tool calls appear as they happen. The architecture is more complex, but the user experience difference is dramatic.

## Server-Sent Events: The Transport Layer

SSE is the simplest reliable protocol for server-to-client streaming. Unlike WebSockets, it works over standard HTTP, passes through all proxies and CDNs, and auto-reconnects on failure.

```mermaid
flowchart TD
    USER(["User message"])
    LLM["LLM call
with tools schema"]
    DECIDE{"Model wants
to call a tool?"}
    EXEC["Execute tool
sandboxed runtime"]
    RESULT["Append tool_result
to messages"]
    GUARD{"Output passes
guardrails?"}
    DONE(["Final reply"])
    BLOCK(["Refuse and log"])
    USER --> LLM --> DECIDE
    DECIDE -->|Yes| EXEC --> RESULT --> LLM
    DECIDE -->|No| GUARD
    GUARD -->|Yes| DONE
    GUARD -->|No| BLOCK
    style LLM fill:#4f46e5,stroke:#4338ca,color:#fff
    style EXEC fill:#ede9fe,stroke:#7c3aed,color:#1e1b4b
    style GUARD fill:#f59e0b,stroke:#d97706,color:#1f2937
    style DONE fill:#059669,stroke:#047857,color:#fff
    style BLOCK fill:#dc2626,stroke:#b91c1c,color:#fff
```

```python
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from agents import Agent, Runner
import json

app = FastAPI()

agent = Agent(
    name="Streaming Assistant",
    instructions="You are a helpful assistant. Use tools when needed.",
    tools=[search_tool, calculator_tool],
)

@app.get("/api/chat/stream")
async def stream_chat(message: str):
    async def event_generator():
        result = Runner.run_streamed(agent, message)

        async for event in result.stream_events():
            if event.type == "raw_response_event":
                # Token-by-token text output
                delta = event.data
                if hasattr(delta, "delta") and delta.delta:
                    yield f"data: {json.dumps({'type': 'token', 'content': delta.delta})}\n\n"

            elif event.type == "run_item_stream_event":
                item = event.item
                item_type = type(item).__name__

                if "ToolCall" in item_type:
                    yield f"data: {json.dumps({'type': 'tool_start', 'tool': str(item)})}\n\n"
                elif "ToolOutput" in item_type:
                    yield f"data: {json.dumps({'type': 'tool_result', 'output': str(item)})}\n\n"

        yield f"data: {json.dumps({'type': 'done'})}\n\n"

    return StreamingResponse(
        event_generator(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no",
        },
    )
```

The `X-Accel-Buffering: no` header is critical — it tells Nginx and similar reverse proxies not to buffer the response, which would defeat the purpose of streaming.

## Client-Side Stream Consumption

On the frontend, use the `EventSource` API or a fetch-based reader for more control.

```typescript
// Using fetch for full control over the stream
async function streamChat(message: string, onEvent: (event: StreamEvent) => void) {
  const response = await fetch(
    `/api/chat/stream?message=${encodeURIComponent(message)}`
  );

  const reader = response.body!.getReader();
  const decoder = new TextDecoder();
  let buffer = "";

  while (true) {
    const { done, value } = await reader.read();
    if (done) break;

    buffer += decoder.decode(value, { stream: true });
    const lines = buffer.split("\n");
    buffer = lines.pop() || "";

    for (const line of lines) {
      if (line.startsWith("data: ")) {
        const event = JSON.parse(line.slice(6));
        onEvent(event);
      }
    }
  }
}

// React component that renders the stream
function ChatMessage() {
  const [tokens, setTokens] = useState("");
  const [toolCalls, setToolCalls] = useState([]);

  const handleStream = (event: StreamEvent) => {
    switch (event.type) {
      case "token":
        setTokens((prev) => prev + event.content);
        break;
      case "tool_start":
        setToolCalls((prev) => [...prev, `Calling: ${event.tool}`]);
        break;
      case "tool_result":
        setToolCalls((prev) => [...prev, `Result: ${event.output}`]);
        break;
    }
  };

  return (

{tokens}

      {toolCalls.map((tc, i) => (
        {tc}
      ))}

  );
}
```

## Handling Partial Tool Calls

One of the trickiest parts of streaming agents is partial tool calls. The LLM might stream the tool name and arguments token by token. You need to buffer the tool call until it is complete before executing it.

```python
class ToolCallBuffer:
    """Accumulates partial tool call tokens until the call is complete."""

    def __init__(self):
        self.active_calls: dict[int, dict] = {}

    def process_delta(self, delta) -> list[dict] | None:
        """Process a streaming delta, return complete tool calls if any."""
        completed = []

        if hasattr(delta, "tool_calls") and delta.tool_calls:
            for tc in delta.tool_calls:
                idx = tc.index

                if idx not in self.active_calls:
                    self.active_calls[idx] = {
                        "name": "",
                        "arguments": "",
                    }

                if tc.function and tc.function.name:
                    self.active_calls[idx]["name"] += tc.function.name
                if tc.function and tc.function.arguments:
                    self.active_calls[idx]["arguments"] += tc.function.arguments

        # Check if any calls are complete (valid JSON arguments)
        for idx in list(self.active_calls.keys()):
            call = self.active_calls[idx]
            try:
                json.loads(call["arguments"])
                completed.append(call)
                del self.active_calls[idx]
            except json.JSONDecodeError:
                pass  # Still accumulating

        return completed if completed else None
```

## Progressive Rendering Pattern

Instead of waiting for the entire agent response, render each phase as it completes.

```typescript
interface StreamPhase {
  type: "thinking" | "tool_executing" | "responding";
  content: string;
}

function ProgressiveResponse({ phases }: { phases: StreamPhase[] }) {
  return (

      {phases.map((phase, i) => (

          {phase.type === "tool_executing" && (
            ⚙
          )}
          {phase.content}

      ))}

  );
}
```

## Backpressure and Connection Management

In production, manage connections carefully to prevent resource exhaustion.

```python
import asyncio
from contextlib import asynccontextmanager

class ConnectionManager:
    def __init__(self, max_concurrent: int = 100):
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.active_connections: set[str] = set()

    @asynccontextmanager
    async def connect(self, connection_id: str):
        await self.semaphore.acquire()
        self.active_connections.add(connection_id)
        try:
            yield
        finally:
            self.active_connections.discard(connection_id)
            self.semaphore.release()

manager = ConnectionManager(max_concurrent=100)

@app.get("/api/chat/stream")
async def stream_chat(message: str, connection_id: str):
    async with manager.connect(connection_id):
        async def event_generator():
            # ... streaming logic
            pass
        return StreamingResponse(event_generator(), media_type="text/event-stream")
```

## FAQ

### How do you handle client disconnects mid-stream?

FastAPI detects client disconnection when the response write fails. Wrap your generator in a try/except that catches `ConnectionResetError` and `BrokenPipeError`. When caught, cancel any pending LLM calls or tool executions to free resources. The OpenAI SDK supports cancellation tokens for this purpose.

### What happens if a tool call takes 30 seconds during a stream?

Send heartbeat events during long tool executions to keep the connection alive and show progress. Emit a `tool_start` event immediately, then send periodic `tool_progress` events (e.g., every 2 seconds), and finally emit `tool_result` when the tool completes. This prevents connection timeouts and keeps the user informed.

### Can you stream from multiple agents in parallel?

Yes, but it requires careful event multiplexing. Assign each agent stream a unique channel ID. On the client, demultiplex events by channel and render each agent's output in its designated area. Use async generators with `asyncio.as_completed()` or merge streams to interleave events from concurrent agents.

---

#StreamingAI #ServerSentEvents #RealTimeAI #AgentStreaming #ProgressiveRendering #ToolInterleaving #FastAPI #SSE

---

Source: https://callsphere.ai/blog/streaming-agent-architectures-real-time-token-tool-interleaving
