---
title: "Streaming Responses from OpenAI: Real-Time Token-by-Token Output"
description: "Learn how to stream OpenAI responses token-by-token using the Python SDK, implement async streaming for web applications, and display incremental results to users."
canonical: https://callsphere.ai/blog/streaming-responses-openai-real-time-token-by-token-output
category: "Learn Agentic AI"
tags: ["OpenAI", "Streaming", "Server-Sent Events", "Async Python", "Real-Time"]
author: "CallSphere Team"
published: 2026-03-17T00:00:00.000Z
updated: 2026-05-06T01:02:42.574Z
---

# Streaming Responses from OpenAI: Real-Time Token-by-Token Output

> Learn how to stream OpenAI responses token-by-token using the Python SDK, implement async streaming for web applications, and display incremental results to users.

## Why Streaming Matters

When a model generates a long response, the standard (non-streaming) API makes you wait for the entire completion before returning anything. For a 500-token response, that can mean several seconds of silence before any text appears. Streaming changes this by delivering tokens as they are generated, giving users the familiar "typing" experience seen in ChatGPT.

Streaming is essential for chatbots, real-time UIs, and any application where perceived latency matters.

## Basic Synchronous Streaming

Enable streaming by setting `stream=True`:

```mermaid
sequenceDiagram
    autonumber
    participant Client
    participant Edge as Edge Worker
    participant LLM as LLM Provider
    participant DB as Logs and Trace
    Client->>Edge: POST /chat (stream=true)
    Edge->>LLM: messages.create(stream=true)
    loop Each token
        LLM-->>Edge: SSE chunk delta
        Edge-->>Client: SSE chunk delta
        Edge->>DB: append token to span
    end
    LLM-->>Edge: stop_reason=end_turn
    Edge-->>Client: event: done
    Edge->>DB: finalize trace
```

```python
from openai import OpenAI

client = OpenAI()

stream = client.chat.completions.create(
    model="gpt-4o",
    messages=[
        {"role": "user", "content": "Write a short guide to Python decorators."},
    ],
    stream=True,
)

for chunk in stream:
    delta = chunk.choices[0].delta
    if delta.content:
        print(delta.content, end="", flush=True)

print()  # newline after streaming completes
```

Each `chunk` is a `ChatCompletionChunk` object. The `delta` field contains the incremental content — usually one or a few tokens per chunk. The first chunk often has the `role` field set, and subsequent chunks contain `content`.

## Async Streaming

For web applications built with FastAPI, Django, or similar frameworks, async streaming is the right approach:

```python
import asyncio
from openai import AsyncOpenAI

client = AsyncOpenAI()

async def stream_response(prompt: str):
    stream = await client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {"role": "user", "content": prompt},
        ],
        stream=True,
    )

    full_response = ""
    async for chunk in stream:
        delta = chunk.choices[0].delta
        if delta.content:
            full_response += delta.content
            print(delta.content, end="", flush=True)

    print()
    return full_response

result = asyncio.run(stream_response("Explain async generators in Python."))
```

The async client uses `async for` to iterate over chunks without blocking the event loop, which means your server can handle other requests concurrently during generation.

## Building an SSE Endpoint with FastAPI

Server-Sent Events (SSE) are the standard way to push streaming responses to a browser:

```python
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from openai import AsyncOpenAI

app = FastAPI()
client = AsyncOpenAI()

async def generate_stream(prompt: str):
    stream = await client.chat.completions.create(
        model="gpt-4o",
        messages=[{"role": "user", "content": prompt}],
        stream=True,
    )

    async for chunk in stream:
        delta = chunk.choices[0].delta
        if delta.content:
            yield f"data: {delta.content}\n\n"

    yield "data: [DONE]\n\n"

@app.get("/stream")
async def stream_endpoint(prompt: str):
    return StreamingResponse(
        generate_stream(prompt),
        media_type="text/event-stream",
    )
```

On the frontend, consume this with the `EventSource` API or a fetch-based SSE reader.

## Collecting the Full Response While Streaming

A common pattern is to display tokens in real-time while also building up the complete response for storage or further processing:

```python
from openai import OpenAI

client = OpenAI()

def stream_and_collect(messages: list[dict]) -> str:
    stream = client.chat.completions.create(
        model="gpt-4o",
        messages=messages,
        stream=True,
    )

    collected_content = []
    for chunk in stream:
        delta = chunk.choices[0].delta
        if delta.content:
            collected_content.append(delta.content)
            print(delta.content, end="", flush=True)

    print()
    return "".join(collected_content)

full_text = stream_and_collect([
    {"role": "user", "content": "Summarize the Python GIL in 3 sentences."},
])
# full_text now contains the entire response
```

## Handling Stream Interruptions

Network issues can interrupt a stream mid-response. Wrap your streaming code in proper error handling:

```python
from openai import OpenAI, APIConnectionError, APITimeoutError

client = OpenAI()

def safe_stream(messages: list[dict]) -> str:
    try:
        stream = client.chat.completions.create(
            model="gpt-4o",
            messages=messages,
            stream=True,
        )

        parts = []
        for chunk in stream:
            delta = chunk.choices[0].delta
            if delta.content:
                parts.append(delta.content)

        return "".join(parts)

    except APIConnectionError:
        return "Connection lost during streaming. Please retry."
    except APITimeoutError:
        return "Request timed out. Please retry."
```

## FAQ

### Does streaming cost more tokens than non-streaming?

No. Token usage is identical whether you stream or not. The only difference is how the response is delivered to your client.

### Can I use streaming with function calling?

Yes. When the model decides to call a function, the tool call arguments are streamed incrementally in the `delta.tool_calls` field. You accumulate the argument string across chunks and parse it once complete.

### How do I know when the stream is finished?

The stream ends when iteration completes. The last chunk will have a `finish_reason` set on `choices[0]` (e.g., `stop` or `tool_calls`). If you are sending SSE, emit a `[DONE]` event as a signal to the frontend.

---

#OpenAI #Streaming #ServerSentEvents #AsyncPython #RealTime #AgenticAI #LearnAI #AIEngineering

---

Source: https://callsphere.ai/blog/streaming-responses-openai-real-time-token-by-token-output
