Skip to content
Debugging Streaming Issues: Fixing Dropped Tokens, Connection Resets, and Partial Responses
Learn Agentic AI11 min read21 views

Debugging Streaming Issues: Fixing Dropped Tokens, Connection Resets, and Partial Responses

Learn how to diagnose and fix common streaming problems in AI agents including dropped tokens, connection resets, partial responses, and timeout failures with practical debugging techniques.

Streaming Looks Simple Until It Breaks

Streaming LLM responses gives users instant feedback — tokens appear as they are generated instead of waiting for the full response. But streaming introduces a class of bugs that do not exist in non-streaming mode: dropped tokens, mid-stream disconnects, partial tool calls, and buffer corruption.

These bugs are insidious because they are often intermittent. The stream works perfectly for 99 conversations, then silently drops the last 50 tokens on the 100th. Users see a response that ends mid-sentence, and your logs might not capture what went wrong.

Building a Stream Diagnostic Wrapper

Wrap your streaming calls with diagnostics that track every chunk:

Hear it before you finish reading

Talk to a live CallSphere AI voice agent in your browser — 60 seconds, no signup.

Try Live Demo →
sequenceDiagram
    autonumber
    participant Client
    participant Edge as Edge Worker
    participant LLM as LLM Provider
    participant DB as Logs and Trace
    Client->>Edge: POST /chat (stream=true)
    Edge->>LLM: messages.create(stream=true)
    loop Each token
        LLM-->>Edge: SSE chunk delta
        Edge-->>Client: SSE chunk delta
        Edge->>DB: append token to span
    end
    LLM-->>Edge: stop_reason=end_turn
    Edge-->>Client: event: done
    Edge->>DB: finalize trace
import asyncio
import time
from dataclasses import dataclass, field

@dataclass
class StreamDiagnostics:
    chunks_received: int = 0
    total_content_length: int = 0
    first_chunk_ms: float = 0
    last_chunk_ms: float = 0
    finish_reason: str | None = None
    errors: list[str] = field(default_factory=list)
    chunk_gaps: list[float] = field(default_factory=list)

async def debug_stream(client, messages, **kwargs):
    diag = StreamDiagnostics()
    start = time.perf_counter()
    last_chunk_time = start
    full_content = []

    try:
        stream = await client.chat.completions.create(
            messages=messages,
            stream=True,
            **kwargs,
        )

        async for chunk in stream:
            now = time.perf_counter()
            diag.chunks_received += 1

            if diag.chunks_received == 1:
                diag.first_chunk_ms = (now - start) * 1000

            gap = (now - last_chunk_time) * 1000
            diag.chunk_gaps.append(gap)
            last_chunk_time = now

            delta = chunk.choices[0].delta if chunk.choices else None
            if delta and delta.content:
                full_content.append(delta.content)
                diag.total_content_length += len(delta.content)

            if chunk.choices and chunk.choices[0].finish_reason:
                diag.finish_reason = chunk.choices[0].finish_reason

    except Exception as e:
        diag.errors.append(f"{type(e).__name__}: {e}")

    diag.last_chunk_ms = (time.perf_counter() - start) * 1000
    return "".join(full_content), diag

Detecting Dropped Tokens

Dropped tokens occur when chunks are lost in transit or when the client disconnects before the stream completes. Compare streaming output against a non-streaming request with the same input:

async def verify_stream_completeness(client, messages, model="gpt-4o"):
    # Get non-streaming response as baseline
    non_stream = await client.chat.completions.create(
        model=model,
        messages=messages,
        temperature=0,
        stream=False,
    )
    baseline = non_stream.choices[0].message.content

    # Get streaming response
    streamed_content, diag = await debug_stream(
        client, messages, model=model, temperature=0,
    )

    # Compare
    match = baseline == streamed_content
    if not match:
        print(f"MISMATCH DETECTED")
        print(f"  Baseline length:  {len(baseline)}")
        print(f"  Streamed length:  {len(streamed_content)}")
        print(f"  Finish reason:    {diag.finish_reason}")
        # Find where they diverge
        for i, (a, b) in enumerate(zip(baseline, streamed_content)):
            if a != b:
                print(f"  First diff at char {i}: '{a}' vs '{b}'")
                break
    return match, diag

Handling Connection Timeouts

Long-running streams can be interrupted by proxy timeouts, load balancer idle limits, or client-side timeouts. Set appropriate timeouts and implement reconnection logic:

import httpx

async def resilient_stream(client, messages, **kwargs):
    max_retries = 3
    collected = []

    for attempt in range(max_retries):
        try:
            stream = await client.chat.completions.create(
                messages=messages,
                stream=True,
                timeout=httpx.Timeout(
                    connect=10.0,
                    read=60.0,    # Per-chunk read timeout
                    write=10.0,
                    pool=10.0,
                ),
                **kwargs,
            )
            async for chunk in stream:
                delta = chunk.choices[0].delta if chunk.choices else None
                if delta and delta.content:
                    collected.append(delta.content)
                    yield delta.content

            # Stream completed successfully
            return

        except (httpx.ReadTimeout, httpx.RemoteProtocolError) as e:
            print(f"Stream error on attempt {attempt + 1}: {e}")
            if attempt == max_retries - 1:
                raise
            await asyncio.sleep(1)

Buffering for Tool Call Streams

Tool calls in streaming mode arrive as fragments across multiple chunks. You need to buffer and assemble them before execution:

class ToolCallBuffer:
    def __init__(self):
        self.buffers: dict[int, dict] = {}

    def process_chunk(self, chunk):
        delta = chunk.choices[0].delta if chunk.choices else None
        if not delta or not delta.tool_calls:
            return None

        for tc_delta in delta.tool_calls:
            idx = tc_delta.index
            if idx not in self.buffers:
                self.buffers[idx] = {
                    "id": tc_delta.id or "",
                    "name": "",
                    "arguments": "",
                }
            if tc_delta.function:
                if tc_delta.function.name:
                    self.buffers[idx]["name"] = tc_delta.function.name
                if tc_delta.function.arguments:
                    self.buffers[idx]["arguments"] += tc_delta.function.arguments

        # Check if stream is done
        if chunk.choices[0].finish_reason == "tool_calls":
            return list(self.buffers.values())
        return None

FAQ

Why does my stream sometimes end without a finish_reason?

This usually indicates the connection was interrupted before the model completed its response. Common causes include proxy timeouts (Nginx default is 60 seconds), client-side timeout settings, or network instability. Check your reverse proxy configuration and increase read timeouts for LLM streaming endpoints.

Still reading? Stop comparing — try CallSphere live.

CallSphere ships complete AI voice agents per industry — 14 tools for healthcare, 10 agents for real estate, 4 specialists for salons. See how it actually handles a call before you book a demo.

How do I handle streaming when the model makes a tool call mid-response?

When streaming with tools enabled, the model may emit content tokens and then switch to emitting tool call deltas. Monitor the delta.tool_calls field on each chunk. Buffer the tool call fragments until you receive a finish_reason of tool_calls, then assemble and execute the complete tool call.

Should I disable streaming for agent workflows and only use it for final user-facing responses?

This is a common and effective pattern. Use non-streaming requests for internal agent reasoning and tool call cycles where latency per-turn matters less than reliability. Enable streaming only for the final response sent to the user where perceived latency matters most.


#Debugging #Streaming #WebSocket #AIAgents #Performance #AgenticAI #LearnAI #AIEngineering

Share

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.

Related Articles You May Like

AI Agents

Personal AI Assistant: How to Pick One for Business in 2026

A founder's guide to the personal AI assistant market: best AI assistant apps, business-grade options, and how CallSphere's voice agent fits in.

AI Agents

Free AI Agents in 2026: When Free Wins and When It Costs You

A founder's guide to free AI agents, low-code AI agent builders, and how to know when you should pay for a real platform like CallSphere.

Agentic AI

Graphiti: How Temporal Knowledge Graphs Give AI Voice Agents Persistent Memory (2026 Guide)

Graphiti is the open-source temporal knowledge graph for AI agents in 2026. Learn how bi-temporal memory beats vector RAG for voice agents and long-running LLMs.

AI Agents

Chatbot App vs ChatGPT: What's the Difference, and Which Do I Need?

Chatbot app vs ChatGPT in 2026: a founder's clear take on the difference, when to use which, and how a real AI chatbot app development works.

HVAC

Building an HVAC After-Hours Emergency Escalation System: A Complete Engineering Guide

How we built a fault-tolerant HVAC emergency triage and tech-dispatch platform on Kubernetes — three-tier CQRS, 11 micro-agents on the OpenAI Agents SDK + LangGraph, NATS JetStream, DTMF/SMS/WebSocket acceptance, circuit breakers, and an evaluation pipeline that catches regressions before they wake a tech at 3 AM.

AI Engineering

GPT-Realtime-Whisper vs Deepgram: Streaming STT in 2026

OpenAI's GPT-Realtime-Whisper launches at $0.017/min for streaming STT. Side-by-side latency, accuracy, and cost math vs Deepgram and the field.