---
title: "LangChain Callbacks and Streaming: Real-Time Token Output and Event Hooks"
description: "Implement real-time streaming in LangChain applications with callback handlers for token-by-token output, custom event logging, cost tracking, and production monitoring hooks."
canonical: https://callsphere.ai/blog/langchain-callbacks-streaming-realtime-token-output-event-hooks
category: "Learn Agentic AI"
tags: ["LangChain", "Streaming", "Callbacks", "Real-Time", "Python"]
author: "CallSphere Team"
published: 2026-03-17T00:00:00.000Z
updated: 2026-06-04T15:55:58.244Z
---

# LangChain Callbacks and Streaming: Real-Time Token Output and Event Hooks

> Implement real-time streaming in LangChain applications with callback handlers for token-by-token output, custom event logging, cost tracking, and production monitoring hooks.

## Why Streaming and Callbacks Matter

LLMs can take seconds to generate a full response. Without streaming, users stare at a blank screen waiting for the complete output. Streaming delivers tokens as they are generated, creating a responsive experience. Callbacks extend this further — they let you hook into every event in a chain's lifecycle for logging, monitoring, cost tracking, and custom integrations.

LangChain's callback system is deeply integrated into every component. Every `Runnable` — prompts, models, parsers, tools, retrievers — fires events that callbacks can intercept.

## Basic Streaming

Every LCEL chain supports `.stream()` out of the box.

```mermaid
sequenceDiagram
    autonumber
    participant Client
    participant Edge as Edge Worker
    participant LLM as LLM Provider
    participant DB as Logs and Trace
    Client->>Edge: POST /chat (stream=true)
    Edge->>LLM: messages.create(stream=true)
    loop Each token
        LLM-->>Edge: SSE chunk delta
        Edge-->>Client: SSE chunk delta
        Edge->>DB: append token to span
    end
    LLM-->>Edge: stop_reason=end_turn
    Edge-->>Client: event: done
    Edge->>DB: finalize trace
```

```python
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

chain = (
    ChatPromptTemplate.from_template("Write a poem about {topic}")
    | ChatOpenAI(model="gpt-4o-mini")
    | StrOutputParser()
)

# Stream tokens as they arrive
for chunk in chain.stream({"topic": "the ocean"}):
    print(chunk, end="", flush=True)
```

Each `chunk` is a small piece of the output — typically a few tokens. The `flush=True` ensures output appears immediately in the terminal. For async code, use `astream`:

```python
async for chunk in chain.astream({"topic": "the ocean"}):
    print(chunk, end="", flush=True)
```

## Streaming with FastAPI

In production, you often stream LLM output over HTTP using Server-Sent Events.

```python
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

app = FastAPI()

chain = (
    ChatPromptTemplate.from_template("Answer this question: {question}")
    | ChatOpenAI(model="gpt-4o-mini", streaming=True)
    | StrOutputParser()
)

@app.get("/stream")
async def stream_response(question: str):
    async def event_generator():
        async for chunk in chain.astream({"question": question}):
            yield f"data: {chunk}\n\n"
        yield "data: [DONE]\n\n"

    return StreamingResponse(
        event_generator(),
        media_type="text/event-stream",
    )
```

The client connects to `/stream?question=...` and receives tokens in real time via SSE.

## Callback Handlers

Callbacks intercept lifecycle events across all LangChain components. The base class `BaseCallbackHandler` defines methods for every event type.

```python
from langchain_core.callbacks import BaseCallbackHandler
from langchain_core.messages import BaseMessage

class LoggingHandler(BaseCallbackHandler):
    def on_llm_start(self, serialized, prompts, **kwargs):
        print(f"LLM started with {len(prompts)} prompts")

    def on_llm_new_token(self, token: str, **kwargs):
        print(f"Token: {repr(token)}")

    def on_llm_end(self, response, **kwargs):
        print(f"LLM finished. Tokens used: {response.llm_output}")

    def on_chain_start(self, serialized, inputs, **kwargs):
        print(f"Chain started: {serialized.get('name', 'unknown')}")

    def on_chain_end(self, outputs, **kwargs):
        print(f"Chain finished with keys: {list(outputs.keys())}")

    def on_tool_start(self, serialized, input_str, **kwargs):
        print(f"Tool called: {serialized.get('name')}")

    def on_tool_end(self, output, **kwargs):
        print(f"Tool returned: {output[:100]}")
```

Pass callbacks when invoking a chain:

```python
handler = LoggingHandler()
result = chain.invoke(
    {"topic": "AI"},
    config={"callbacks": [handler]},
)
```

## Building a Cost Tracker

A practical callback example: tracking token usage and estimating costs.

```python
from langchain_core.callbacks import BaseCallbackHandler

class CostTracker(BaseCallbackHandler):
    def __init__(self):
        self.total_prompt_tokens = 0
        self.total_completion_tokens = 0
        self.total_cost = 0.0

    # Pricing per 1M tokens (example for gpt-4o-mini)
    PRICING = {
        "gpt-4o-mini": {"prompt": 0.15, "completion": 0.60},
        "gpt-4o": {"prompt": 2.50, "completion": 10.00},
    }

    def on_llm_end(self, response, **kwargs):
        usage = response.llm_output.get("token_usage", {})
        prompt_tokens = usage.get("prompt_tokens", 0)
        completion_tokens = usage.get("completion_tokens", 0)

        self.total_prompt_tokens += prompt_tokens
        self.total_completion_tokens += completion_tokens

        model = response.llm_output.get("model_name", "gpt-4o-mini")
        prices = self.PRICING.get(model, self.PRICING["gpt-4o-mini"])

        cost = (
            prompt_tokens * prices["prompt"] / 1_000_000
            + completion_tokens * prices["completion"] / 1_000_000
        )
        self.total_cost += cost

    def report(self):
        return {
            "prompt_tokens": self.total_prompt_tokens,
            "completion_tokens": self.total_completion_tokens,
            "total_cost_usd": round(self.total_cost, 6),
        }

tracker = CostTracker()
result = chain.invoke({"topic": "AI"}, config={"callbacks": [tracker]})
print(tracker.report())
# {"prompt_tokens": 42, "completion_tokens": 187, "total_cost_usd": 0.000119}
```

## Async Callback Handlers

For high-throughput applications, use `AsyncCallbackHandler` to avoid blocking the event loop.

```python
from langchain_core.callbacks import AsyncCallbackHandler

class AsyncLogger(AsyncCallbackHandler):
    async def on_llm_start(self, serialized, prompts, **kwargs):
        await log_to_database("llm_start", prompts)

    async def on_llm_new_token(self, token: str, **kwargs):
        await websocket_broadcast(token)

    async def on_llm_end(self, response, **kwargs):
        await log_to_database("llm_end", response.llm_output)
```

Async handlers are essential when your callback logic involves I/O operations like database writes or WebSocket broadcasts.

## astream_events: Fine-Grained Event Streaming

For maximum control, use `astream_events` to receive every event from every component in a chain.

```python
async for event in chain.astream_events(
    {"topic": "machine learning"},
    version="v2",
):
    kind = event["event"]

    if kind == "on_chat_model_stream":
        # Individual tokens from the LLM
        print(event["data"]["chunk"].content, end="")
    elif kind == "on_retriever_end":
        # Retrieved documents
        docs = event["data"]["output"]
        print(f"\nRetrieved {len(docs)} documents")
    elif kind == "on_tool_end":
        # Tool results
        print(f"\nTool result: {event['data']['output']}")
```

This API gives you visibility into every internal step of a complex chain, including nested sub-chains and parallel branches.

## Combining Multiple Handlers

You can attach multiple callback handlers to a single invocation.

```python
result = chain.invoke(
    {"topic": "AI safety"},
    config={"callbacks": [
        CostTracker(),
        LoggingHandler(),
        AsyncLogger(),
    ]},
)
```

Each handler receives all events independently. This lets you compose monitoring concerns — one handler for costs, another for logging, a third for real-time streaming.

## FAQ

### What is the difference between .stream() and callbacks with on_llm_new_token?

`.stream()` yields chunks from the final output of the chain. `on_llm_new_token` fires for every token generated by the LLM, even if the chain has post-processing steps. Use `.stream()` for user-facing output and `on_llm_new_token` for internal monitoring.

### Can I use callbacks without streaming?

Yes. Callbacks fire for all events regardless of whether you use `.invoke()`, `.stream()`, or `.batch()`. They are useful for logging, cost tracking, and monitoring even when you do not need streaming output.

### How do I test callback handlers?

Create a handler instance, run a chain with it attached, then assert on the handler's state. For example, invoke a chain with the `CostTracker` handler and verify that `total_prompt_tokens > 0`. Mock the LLM for deterministic tests using `FakeListChatModel` from `langchain_core.language_models`.

---

#LangChain #Streaming #Callbacks #RealTime #Python #AgenticAI #LearnAI #AIEngineering

---

Source: https://callsphere.ai/blog/langchain-callbacks-streaming-realtime-token-output-event-hooks
