---
title: "Streaming Structured Outputs: Incremental JSON Parsing for Real-Time Applications"
description: "Learn how to stream structured outputs from LLMs for real-time UI updates. Covers partial JSON parsing, streaming with Instructor and Pydantic, progressive UI rendering, and handling incomplete data."
canonical: https://callsphere.ai/blog/streaming-structured-outputs-incremental-json-parsing-realtime
category: "Learn Agentic AI"
tags: ["Streaming", "Real-Time", "JSON Parsing", "Structured Outputs", "Python"]
author: "CallSphere Team"
published: 2026-03-17T00:00:00.000Z
updated: 2026-05-07T19:04:21.293Z
---

# Streaming Structured Outputs: Incremental JSON Parsing for Real-Time Applications

> Learn how to stream structured outputs from LLMs for real-time UI updates. Covers partial JSON parsing, streaming with Instructor and Pydantic, progressive UI rendering, and handling incomplete data.

## The Streaming Problem for Structured Data

Standard structured output extraction waits for the entire LLM response before parsing. For small extractions this is fine, but when generating large structured objects — a detailed analysis with ten sections, a list of fifty extracted entities — the user stares at a loading spinner for 5-15 seconds.

Streaming solves this by delivering partial results as the model generates tokens. The challenge is that partial JSON is invalid JSON. You cannot call `json.loads()` on half an object. You need specialized parsing that handles incomplete data.

## How Instructor Handles Streaming

Instructor provides a `create_partial` method that yields progressively more complete Pydantic objects:

```mermaid
sequenceDiagram
    autonumber
    participant Client
    participant Edge as Edge Worker
    participant LLM as LLM Provider
    participant DB as Logs and Trace
    Client->>Edge: POST /chat (stream=true)
    Edge->>LLM: messages.create(stream=true)
    loop Each token
        LLM-->>Edge: SSE chunk delta
        Edge-->>Client: SSE chunk delta
        Edge->>DB: append token to span
    end
    LLM-->>Edge: stop_reason=end_turn
    Edge-->>Client: event: done
    Edge->>DB: finalize trace
```

```python
import instructor
from openai import OpenAI
from pydantic import BaseModel, Field
from typing import List, Optional

client = instructor.from_openai(OpenAI())

class AnalysisReport(BaseModel):
    title: str
    executive_summary: Optional[str] = None
    key_findings: List[str] = Field(default_factory=list)
    recommendations: List[str] = Field(default_factory=list)
    risk_level: Optional[str] = None

# Stream partial results
for partial_report in client.chat.completions.create_partial(
    model="gpt-4o",
    response_model=AnalysisReport,
    messages=[
        {
            "role": "system",
            "content": "Analyze the market data and produce a detailed report."
        },
        {
            "role": "user",
            "content": "Q4 2025 SaaS market data: ARR growth 23%, churn decreased to 4.2%..."
        }
    ],
    stream=True,
):
    # Each iteration yields a more complete AnalysisReport
    print(f"Title: {partial_report.title}")
    print(f"Findings so far: {len(partial_report.key_findings)}")
    print("---")
```

Each iteration yields a valid Pydantic object with whatever fields have been completed so far. Fields not yet streamed show their default values (empty lists, None).

## Building a Real-Time UI with Streaming

Connect streaming structured outputs to a FastAPI server-sent events endpoint:

```python
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import json

app = FastAPI()

async def stream_analysis(query: str):
    """Generator that yields SSE events with partial structured data."""
    from openai import AsyncOpenAI

    async_client = instructor.from_openai(AsyncOpenAI())

    async for partial in async_client.chat.completions.create_partial(
        model="gpt-4o",
        response_model=AnalysisReport,
        messages=[
            {"role": "system", "content": "Analyze the data."},
            {"role": "user", "content": query}
        ],
        stream=True,
    ):
        # Send each partial result as an SSE event
        data = partial.model_dump(exclude_none=True)
        yield f"data: {json.dumps(data)}\n\n"

    yield "data: [DONE]\n\n"

@app.get("/api/analyze")
async def analyze(query: str):
    return StreamingResponse(
        stream_analysis(query),
        media_type="text/event-stream",
    )
```

On the frontend, consume the stream with an EventSource:

```python
# Frontend JavaScript (shown for completeness)
# const source = new EventSource("/api/analyze?query=...");
# source.onmessage = (event) => {
#   if (event.data === "[DONE]") { source.close(); return; }
#   const partial = JSON.parse(event.data);
#   updateUI(partial);
# };
```

## Manual Partial JSON Parsing

If you are not using Instructor, you can parse partial JSON manually. The key insight is that incomplete JSON can often be made valid by closing open brackets and braces:

```python
import json
import re

def try_parse_partial_json(partial: str) -> dict | None:
    """Attempt to parse a partial JSON string by closing open structures."""
    # Count unclosed brackets and braces
    open_braces = partial.count("{") - partial.count("}")
    open_brackets = partial.count("[") - partial.count("]")

    # Remove trailing comma if present
    cleaned = partial.rstrip().rstrip(",")

    # Close open structures
    cleaned += "]" * open_brackets
    cleaned += "}" * open_braces

    try:
        return json.loads(cleaned)
    except json.JSONDecodeError:
        return None

# Example: partial stream from LLM
partial_stream = '{"title": "Market Report", "findings": ["Growth is strong"'
result = try_parse_partial_json(partial_stream)
print(result)
# {"title": "Market Report", "findings": ["Growth is strong"]}
```

This approach is fragile — it breaks on strings that contain literal braces. For production use, prefer Instructor's built-in partial parsing.

## Streaming Lists of Objects

When extracting a list of items, you want each completed item to appear as soon as possible:

```python
from typing import Iterable

class ExtractedContact(BaseModel):
    name: str
    email: Optional[str] = None
    company: Optional[str] = None

# Stream individual items as they complete
for contact in client.chat.completions.create_iterable(
    model="gpt-4o",
    response_model=ExtractedContact,
    messages=[
        {
            "role": "user",
            "content": "Extract contacts: John (john@acme.com, Acme), Sarah (sarah@corp.io, BigCorp)..."
        }
    ],
):
    print(f"Got contact: {contact.name} at {contact.company}")
    # Process each contact immediately — no waiting for full list
    save_to_database(contact)
```

The `create_iterable` method yields fully validated individual objects as they are completed in the stream. This is different from `create_partial`, which yields increasingly complete versions of the entire response model.

## Handling Stream Interruptions

Streams can be interrupted by network issues or timeouts. Handle partial completion gracefully:

```python
from dataclasses import dataclass, field

@dataclass
class StreamResult:
    completed: bool = False
    last_partial: dict = field(default_factory=dict)
    items_received: int = 0
    error: str | None = None

async def safe_stream_extraction(text: str) -> StreamResult:
    result = StreamResult()
    async_client = instructor.from_openai(AsyncOpenAI())

    try:
        async for partial in async_client.chat.completions.create_partial(
            model="gpt-4o",
            response_model=AnalysisReport,
            messages=[
                {"role": "system", "content": "Analyze the data."},
                {"role": "user", "content": text}
            ],
            stream=True,
        ):
            result.last_partial = partial.model_dump(exclude_none=True)
            result.items_received += 1

        result.completed = True
    except Exception as e:
        result.error = str(e)
        # last_partial still contains the most recent valid state

    return result
```

Even on failure, `last_partial` contains whatever data was successfully streamed before the interruption.

## FAQ

### What is the latency improvement from streaming structured outputs?

Time-to-first-token is typically 200-500ms regardless of total response length. Without streaming, the user waits for the full 3-15 second generation. With streaming, the UI starts updating after that first 200-500ms. For large structured outputs (50+ fields), perceived latency drops by 80-90%.

### Does streaming affect the quality of structured outputs?

No. The model generates the same tokens whether you stream or not. The difference is purely in delivery timing. Strict mode and constrained decoding still apply to the full generation; streaming just lets you observe the output incrementally.

### Can I stream and validate simultaneously?

With Instructor's `create_partial`, each yielded object is a valid Pydantic instance with default values for incomplete fields. Full validation (including cross-field validators) only applies when the stream completes. During streaming, individual field types are validated as they appear, but model-level validators that depend on multiple fields wait until the end.

---

#Streaming #RealTime #JSONParsing #StructuredOutputs #Python #AgenticAI #LearnAI #AIEngineering

---

Source: https://callsphere.ai/blog/streaming-structured-outputs-incremental-json-parsing-realtime
