Skip to content
LangChain Callbacks and Streaming: Real-Time Token Output and Event Hooks
Learn Agentic AI11 min read24 views

LangChain Callbacks and Streaming: Real-Time Token Output and Event Hooks

Implement real-time streaming in LangChain applications with callback handlers for token-by-token output, custom event logging, cost tracking, and production monitoring hooks.

Why Streaming and Callbacks Matter

LLMs can take seconds to generate a full response. Without streaming, users stare at a blank screen waiting for the complete output. Streaming delivers tokens as they are generated, creating a responsive experience. Callbacks extend this further — they let you hook into every event in a chain's lifecycle for logging, monitoring, cost tracking, and custom integrations.

LangChain's callback system is deeply integrated into every component. Every Runnable — prompts, models, parsers, tools, retrievers — fires events that callbacks can intercept.

Basic Streaming

Every LCEL chain supports .stream() out of the box.

sequenceDiagram
    autonumber
    participant Client
    participant Edge as Edge Worker
    participant LLM as LLM Provider
    participant DB as Logs and Trace
    Client->>Edge: POST /chat (stream=true)
    Edge->>LLM: messages.create(stream=true)
    loop Each token
        LLM-->>Edge: SSE chunk delta
        Edge-->>Client: SSE chunk delta
        Edge->>DB: append token to span
    end
    LLM-->>Edge: stop_reason=end_turn
    Edge-->>Client: event: done
    Edge->>DB: finalize trace
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

chain = (
    ChatPromptTemplate.from_template("Write a poem about {topic}")
    | ChatOpenAI(model="gpt-4o-mini")
    | StrOutputParser()
)

# Stream tokens as they arrive
for chunk in chain.stream({"topic": "the ocean"}):
    print(chunk, end="", flush=True)

Each chunk is a small piece of the output — typically a few tokens. The flush=True ensures output appears immediately in the terminal. For async code, use astream:

async for chunk in chain.astream({"topic": "the ocean"}):
    print(chunk, end="", flush=True)

Streaming with FastAPI

In production, you often stream LLM output over HTTP using Server-Sent Events.

Hear it before you finish reading

Talk to a live CallSphere AI voice agent in your browser — 60 seconds, no signup.

Try Live Demo →
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

app = FastAPI()

chain = (
    ChatPromptTemplate.from_template("Answer this question: {question}")
    | ChatOpenAI(model="gpt-4o-mini", streaming=True)
    | StrOutputParser()
)

@app.get("/stream")
async def stream_response(question: str):
    async def event_generator():
        async for chunk in chain.astream({"question": question}):
            yield f"data: {chunk}\n\n"
        yield "data: [DONE]\n\n"

    return StreamingResponse(
        event_generator(),
        media_type="text/event-stream",
    )

The client connects to /stream?question=... and receives tokens in real time via SSE.

Callback Handlers

Callbacks intercept lifecycle events across all LangChain components. The base class BaseCallbackHandler defines methods for every event type.

from langchain_core.callbacks import BaseCallbackHandler
from langchain_core.messages import BaseMessage

class LoggingHandler(BaseCallbackHandler):
    def on_llm_start(self, serialized, prompts, **kwargs):
        print(f"LLM started with {len(prompts)} prompts")

    def on_llm_new_token(self, token: str, **kwargs):
        print(f"Token: {repr(token)}")

    def on_llm_end(self, response, **kwargs):
        print(f"LLM finished. Tokens used: {response.llm_output}")

    def on_chain_start(self, serialized, inputs, **kwargs):
        print(f"Chain started: {serialized.get('name', 'unknown')}")

    def on_chain_end(self, outputs, **kwargs):
        print(f"Chain finished with keys: {list(outputs.keys())}")

    def on_tool_start(self, serialized, input_str, **kwargs):
        print(f"Tool called: {serialized.get('name')}")

    def on_tool_end(self, output, **kwargs):
        print(f"Tool returned: {output[:100]}")

Pass callbacks when invoking a chain:

handler = LoggingHandler()
result = chain.invoke(
    {"topic": "AI"},
    config={"callbacks": [handler]},
)

Building a Cost Tracker

A practical callback example: tracking token usage and estimating costs.

from langchain_core.callbacks import BaseCallbackHandler

class CostTracker(BaseCallbackHandler):
    def __init__(self):
        self.total_prompt_tokens = 0
        self.total_completion_tokens = 0
        self.total_cost = 0.0

    # Pricing per 1M tokens (example for gpt-4o-mini)
    PRICING = {
        "gpt-4o-mini": {"prompt": 0.15, "completion": 0.60},
        "gpt-4o": {"prompt": 2.50, "completion": 10.00},
    }

    def on_llm_end(self, response, **kwargs):
        usage = response.llm_output.get("token_usage", {})
        prompt_tokens = usage.get("prompt_tokens", 0)
        completion_tokens = usage.get("completion_tokens", 0)

        self.total_prompt_tokens += prompt_tokens
        self.total_completion_tokens += completion_tokens

        model = response.llm_output.get("model_name", "gpt-4o-mini")
        prices = self.PRICING.get(model, self.PRICING["gpt-4o-mini"])

        cost = (
            prompt_tokens * prices["prompt"] / 1_000_000
            + completion_tokens * prices["completion"] / 1_000_000
        )
        self.total_cost += cost

    def report(self):
        return {
            "prompt_tokens": self.total_prompt_tokens,
            "completion_tokens": self.total_completion_tokens,
            "total_cost_usd": round(self.total_cost, 6),
        }

tracker = CostTracker()
result = chain.invoke({"topic": "AI"}, config={"callbacks": [tracker]})
print(tracker.report())
# {"prompt_tokens": 42, "completion_tokens": 187, "total_cost_usd": 0.000119}

Async Callback Handlers

For high-throughput applications, use AsyncCallbackHandler to avoid blocking the event loop.

from langchain_core.callbacks import AsyncCallbackHandler

class AsyncLogger(AsyncCallbackHandler):
    async def on_llm_start(self, serialized, prompts, **kwargs):
        await log_to_database("llm_start", prompts)

    async def on_llm_new_token(self, token: str, **kwargs):
        await websocket_broadcast(token)

    async def on_llm_end(self, response, **kwargs):
        await log_to_database("llm_end", response.llm_output)

Async handlers are essential when your callback logic involves I/O operations like database writes or WebSocket broadcasts.

astream_events: Fine-Grained Event Streaming

For maximum control, use astream_events to receive every event from every component in a chain.

Still reading? Stop comparing — try CallSphere live.

CallSphere ships complete AI voice agents per industry — 14 tools for healthcare, 10 agents for real estate, 4 specialists for salons. See how it actually handles a call before you book a demo.

async for event in chain.astream_events(
    {"topic": "machine learning"},
    version="v2",
):
    kind = event["event"]

    if kind == "on_chat_model_stream":
        # Individual tokens from the LLM
        print(event["data"]["chunk"].content, end="")
    elif kind == "on_retriever_end":
        # Retrieved documents
        docs = event["data"]["output"]
        print(f"\nRetrieved {len(docs)} documents")
    elif kind == "on_tool_end":
        # Tool results
        print(f"\nTool result: {event['data']['output']}")

This API gives you visibility into every internal step of a complex chain, including nested sub-chains and parallel branches.

Combining Multiple Handlers

You can attach multiple callback handlers to a single invocation.

result = chain.invoke(
    {"topic": "AI safety"},
    config={"callbacks": [
        CostTracker(),
        LoggingHandler(),
        AsyncLogger(),
    ]},
)

Each handler receives all events independently. This lets you compose monitoring concerns — one handler for costs, another for logging, a third for real-time streaming.

FAQ

What is the difference between .stream() and callbacks with on_llm_new_token?

.stream() yields chunks from the final output of the chain. on_llm_new_token fires for every token generated by the LLM, even if the chain has post-processing steps. Use .stream() for user-facing output and on_llm_new_token for internal monitoring.

Can I use callbacks without streaming?

Yes. Callbacks fire for all events regardless of whether you use .invoke(), .stream(), or .batch(). They are useful for logging, cost tracking, and monitoring even when you do not need streaming output.

How do I test callback handlers?

Create a handler instance, run a chain with it attached, then assert on the handler's state. For example, invoke a chain with the CostTracker handler and verify that total_prompt_tokens > 0. Mock the LLM for deterministic tests using FakeListChatModel from langchain_core.language_models.


#LangChain #Streaming #Callbacks #RealTime #Python #AgenticAI #LearnAI #AIEngineering

Share

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.

Related Articles You May Like

AI Engineering

GPT-Realtime-Whisper vs Deepgram: Streaming STT in 2026

OpenAI's GPT-Realtime-Whisper launches at $0.017/min for streaming STT. Side-by-side latency, accuracy, and cost math vs Deepgram and the field.

Agentic AI

Streaming Agent Responses with OpenAI Agents SDK and LangChain in 2026

How to stream tokens, tool-call deltas, and intermediate steps from an agent — with code for both the OpenAI Agents SDK and LangChain — and the gotchas that bite in production.

Agentic AI

Agentic RAG with LangGraph: Iterative Retrieval, Self-Correction, and Eval Pipelines

Beyond single-shot RAG — agentic RAG with LangGraph that re-retrieves, self-grades, and rewrites queries. With evals that catch silent retrieval drift.

Agentic AI

Token-Level Evaluation of Streaming Agents: TTFT, Stream Smoothness, and Mid-Stream Hallucination Detection

Streaming changes the eval game — final-answer correctness isn't enough when users perceive the answer one token at a time. Here's the metric set that matters.

Agentic AI

Production RAG Agents with LangChain and RAGAS Evaluation in 2026

Build a production RAG agent with LangChain, then measure faithfulness, answer relevance, and context precision with RAGAS. The four metrics that matter and how to wire them up.

Agentic AI

Building Your First Agent with the OpenAI Agents SDK in 2026: A Hands-On Walkthrough

Step-by-step build of a working agent with the OpenAI Agents SDK — Agent class, tools, handoffs, tracing — plus an eval pipeline that catches regressions before merge.