---
title: "SDK Streaming Support: Implementing Real-Time Response Handling in Client Libraries"
description: "Learn how to implement streaming support in AI agent SDKs using Server-Sent Events, async iterators, event handling patterns, and automatic reconnection for real-time response delivery."
canonical: https://callsphere.ai/blog/sdk-streaming-support-real-time-response-handling
category: "Learn Agentic AI"
tags: ["Streaming", "SSE", "Async Iterators", "Real-Time", "SDK Design", "Agentic AI"]
author: "CallSphere Team"
published: 2026-03-17T00:00:00.000Z
updated: 2026-05-06T01:02:44.611Z
---

# SDK Streaming Support: Implementing Real-Time Response Handling in Client Libraries

> Learn how to implement streaming support in AI agent SDKs using Server-Sent Events, async iterators, event handling patterns, and automatic reconnection for real-time response delivery.

## Why Streaming Matters for Agent SDKs

AI agent runs generate output incrementally — the model produces tokens one at a time, tools execute and return results mid-run, and status transitions happen throughout. Without streaming, users wait in silence until the entire run completes. With streaming, they see tokens appear in real time, watch tool calls execute, and can cancel long-running operations.

Streaming is not a nice-to-have for agent SDKs. It is fundamental to building responsive applications.

## Server-Sent Events Parsing

Most AI APIs stream responses using Server-Sent Events (SSE). The format is simple: each event is a series of `field: value` lines separated by double newlines:

```mermaid
sequenceDiagram
    autonumber
    participant Client
    participant Edge as Edge Worker
    participant LLM as LLM Provider
    participant DB as Logs and Trace
    Client->>Edge: POST /chat (stream=true)
    Edge->>LLM: messages.create(stream=true)
    loop Each token
        LLM-->>Edge: SSE chunk delta
        Edge-->>Client: SSE chunk delta
        Edge->>DB: append token to span
    end
    LLM-->>Edge: stop_reason=end_turn
    Edge-->>Client: event: done
    Edge->>DB: finalize trace
```

```
data: {"type": "token", "text": "Hello"}

data: {"type": "token", "text": " world"}

data: {"type": "tool_call", "name": "search", "arguments": "{\"q\": \"weather\"}"}

data: [DONE]
```

Here is a robust SSE parser in Python:

```python
from __future__ import annotations
from dataclasses import dataclass
from typing import AsyncIterator
import json

@dataclass
class SSEEvent:
    event: str | None = None
    data: str = ""
    id: str | None = None
    retry: int | None = None

async def parse_sse(response) -> AsyncIterator[SSEEvent]:
    """Parse an SSE stream from an httpx async response."""
    current = SSEEvent()

    async for line in response.aiter_lines():
        if line == "":
            # Empty line = event boundary
            if current.data:
                yield current
            current = SSEEvent()
            continue

        if line.startswith(":"):
            # Comment line, skip
            continue

        field, _, value = line.partition(":")
        value = value.lstrip(" ")

        if field == "data":
            current.data += value
        elif field == "event":
            current.event = value
        elif field == "id":
            current.id = value
        elif field == "retry":
            try:
                current.retry = int(value)
            except ValueError:
                pass
```

## Python Streaming with Async Iterators

The SDK should expose streaming through async iterators. This lets users consume events with a simple `async for` loop:

```python
from typing import AsyncIterator
import httpx
import json

@dataclass
class StreamEvent:
    type: str
    data: dict

    @property
    def is_token(self) -> bool:
        return self.type == "token"

    @property
    def text(self) -> str:
        return self.data.get("text", "")

class RunStream:
    """Async iterator over a streaming agent run."""

    def __init__(self, response: httpx.Response) -> None:
        self._response = response
        self._collected_text = ""

    async def __aiter__(self) -> AsyncIterator[StreamEvent]:
        async for sse in parse_sse(self._response):
            if sse.data == "[DONE]":
                return

            payload = json.loads(sse.data)
            event = StreamEvent(type=payload["type"], data=payload)

            if event.is_token:
                self._collected_text += event.text

            yield event

    @property
    def collected_text(self) -> str:
        return self._collected_text

class RunsResource:
    def __init__(self, client) -> None:
        self._client = client

    async def create_stream(
        self, agent_id: str, input_text: str
    ) -> RunStream:
        response = await self._client._async_http.stream(
            "POST",
            f"/agents/{agent_id}/runs",
            json={"input": input_text, "stream": True},
        )
        return RunStream(response)
```

Usage becomes intuitive:

```python
stream = await client.runs.create_stream(
    agent_id="agent_abc123",
    input_text="Summarize the quarterly report",
)

async for event in stream:
    if event.is_token:
        print(event.text, end="", flush=True)
    elif event.type == "tool_call":
        print(f"\nCalling tool: {event.data['name']}")

print(f"\nFull response: {stream.collected_text}")
```

## TypeScript Streaming

In TypeScript, use the ReadableStream API to parse SSE from a fetch response:

```typescript
interface StreamEvent {
  type: 'token' | 'tool_call' | 'status' | 'error' | 'done';
  data: Record;
}

async function* parseSSEStream(
  response: Response
): AsyncGenerator {
  const reader = response.body!.getReader();
  const decoder = new TextDecoder();
  let buffer = '';

  try {
    while (true) {
      const { done, value } = await reader.read();
      if (done) break;

      buffer += decoder.decode(value, { stream: true });
      const lines = buffer.split('\n');
      buffer = lines.pop() ?? '';

      for (const line of lines) {
        if (line.startsWith('data: ')) {
          const data = line.slice(6);
          if (data === '[DONE]') return;
          yield JSON.parse(data) as StreamEvent;
        }
      }
    }
  } finally {
    reader.releaseLock();
  }
}

// Usage
const response = await fetch(`${baseUrl}/agents/${agentId}/runs`, {
  method: 'POST',
  headers: { Authorization: `Bearer ${apiKey}` },
  body: JSON.stringify({ input: 'Hello', stream: true }),
});

for await (const event of parseSSEStream(response)) {
  if (event.type === 'token') {
    process.stdout.write(event.data.text as string);
  }
}
```

## Event Callbacks as an Alternative API

Some users prefer event callbacks over async iteration. Offer both patterns:

```python
class RunStream:
    # ... existing async iterator methods ...

    async def on(
        self,
        token: Callable[[str], None] | None = None,
        tool_call: Callable[[dict], None] | None = None,
        done: Callable[[str], None] | None = None,
        error: Callable[[Exception], None] | None = None,
    ) -> str:
        """Consume the stream with event callbacks."""
        async for event in self:
            try:
                if event.type == "token" and token:
                    token(event.text)
                elif event.type == "tool_call" and tool_call:
                    tool_call(event.data)
            except Exception as exc:
                if error:
                    error(exc)
                else:
                    raise

        if done:
            done(self.collected_text)
        return self.collected_text
```

## Automatic Reconnection

Streams break. Connections drop. A robust SDK reconnects automatically using the last event ID:

```python
async def create_stream_with_reconnect(
    self, agent_id: str, input_text: str, max_reconnects: int = 3
) -> AsyncIterator[StreamEvent]:
    last_event_id = None
    reconnect_count = 0

    while reconnect_count  max_reconnects:
                raise
            await asyncio.sleep(1.0 * reconnect_count)
```

## FAQ

### How do I handle backpressure when the SDK receives events faster than the user processes them?

Async iterators handle backpressure naturally. The `async for` loop only requests the next event when the current one has been processed. If the consumer is slow, the SDK buffers incoming data in the HTTP response stream, which applies TCP-level backpressure to the server. Avoid pre-reading all events into an in-memory queue unless you explicitly need lookahead.

### Should I support both streaming and non-streaming from the same method?

No. Use separate methods: `client.runs.create()` for synchronous runs that return a completed result, and `client.runs.create_stream()` for streaming. Mixing the two via a boolean flag makes the return type ambiguous and requires conditional type handling. Separate methods give each mode a clear type signature and distinct documentation.

### How do I test streaming responses in unit tests?

Create mock SSE streams using async generators that yield predefined event sequences. In Python, use `asyncio` to create an `AsyncIterator` that yields `SSEEvent` objects with controlled timing. This lets you test parsing, event handling, and reconnection logic without a live server.

---

#Streaming #SSE #AsyncIterators #RealTime #SDKDesign #AgenticAI #LearnAI #AIEngineering

---

Source: https://callsphere.ai/blog/sdk-streaming-support-real-time-response-handling
