---
title: "LangGraph Streaming: Real-Time Node Updates and Token Streaming"
description: "Implement real-time streaming in LangGraph with stream modes for node-level updates, token-by-token LLM output, custom event streams, and practical patterns for responsive agent UIs."
canonical: https://callsphere.ai/blog/langgraph-streaming-real-time-node-updates-token-streaming
category: "Learn Agentic AI"
tags: ["LangGraph", "Streaming", "Real-Time", "Token Streaming", "Python"]
author: "CallSphere Team"
published: 2026-03-17T00:00:00.000Z
updated: 2026-05-06T01:02:43.490Z
---

# LangGraph Streaming: Real-Time Node Updates and Token Streaming

> Implement real-time streaming in LangGraph with stream modes for node-level updates, token-by-token LLM output, custom event streams, and practical patterns for responsive agent UIs.

## Why Streaming Matters for Agents

Agent workflows can take seconds or even minutes to complete, especially when they involve multiple tool calls, web searches, or multi-step reasoning. Without streaming, users stare at a blank screen until the entire workflow finishes. Streaming gives users real-time visibility into what the agent is doing: which node is currently executing, what tokens the LLM is generating, and what intermediate results have been produced.

## Stream Modes in LangGraph

LangGraph supports multiple stream modes that control what data gets emitted during execution:

```mermaid
flowchart TD
    USER(["User input"])
    SUPER["Supervisor node
routes by state"]
    A["Specialist node A
research"]
    B["Specialist node B
writing"]
    TOOL{"Tool call
needed?"}
    EXEC["Tool executor
ToolNode"]
    CHK[("Postgres
checkpointer")]
    INT{"interrupt for
human approval?"}
    HUMAN(["Human reviewer"])
    OUT(["Final response"])
    USER --> SUPER
    SUPER --> A
    SUPER --> B
    A --> TOOL
    B --> TOOL
    TOOL -->|Yes| EXEC --> SUPER
    TOOL -->|No| INT
    INT -->|Yes| HUMAN --> SUPER
    INT -->|No| OUT
    SUPER  CHK
    style SUPER fill:#4f46e5,stroke:#4338ca,color:#fff
    style CHK fill:#ede9fe,stroke:#7c3aed,color:#1e1b4b
    style OUT fill:#059669,stroke:#047857,color:#fff
    style HUMAN fill:#f59e0b,stroke:#d97706,color:#1f2937
```

```python
from langgraph.graph import StateGraph, START, END
from typing import TypedDict, Annotated
from langgraph.graph.message import add_messages
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage

class State(TypedDict):
    messages: Annotated[list, add_messages]

llm = ChatOpenAI(model="gpt-4o-mini")

def agent(state: State) -> dict:
    return {"messages": [llm.invoke(state["messages"])]}

builder = StateGraph(State)
builder.add_node("agent", agent)
builder.add_edge(START, "agent")
builder.add_edge("agent", END)
graph = builder.compile()
```

## Values Mode: Full State After Each Node

The `values` stream mode emits the complete state after each node finishes:

```python
for chunk in graph.stream(
    {"messages": [HumanMessage(content="Explain quantum computing")]},
    stream_mode="values",
):
    messages = chunk["messages"]
    print(f"State has {len(messages)} messages")
    print(f"Latest: {messages[-1].content[:80]}...")
```

This is useful when your UI needs to render the complete conversation state at each step.

## Updates Mode: Node-Level Deltas

The `updates` stream mode emits only the changes each node makes:

```python
for node_name, update in graph.stream(
    {"messages": [HumanMessage(content="What is LangGraph?")]},
    stream_mode="updates",
):
    print(f"Node '{node_name}' produced:")
    if "messages" in update:
        for msg in update["messages"]:
            print(f"  {msg.content[:80]}...")
```

This is more efficient than `values` mode because you only receive the delta, not the entire accumulated state.

## Token-Level Streaming with astream_events

For token-by-token output from the LLM, use the events streaming API:

```python
import asyncio

async def stream_tokens():
    async for event in graph.astream_events(
        {"messages": [HumanMessage(content="Write a poem about AI")]},
        version="v2",
    ):
        if event["event"] == "on_chat_model_stream":
            token = event["data"]["chunk"].content
            if token:
                print(token, end="", flush=True)

asyncio.run(stream_tokens())
```

The `on_chat_model_stream` event fires for every token the LLM generates. This gives users the familiar ChatGPT-style typing effect even within complex multi-node workflows.

## Filtering Events by Node

In multi-node graphs, you often want to stream tokens only from specific nodes:

```python
async def stream_final_response():
    async for event in graph.astream_events(
        {"messages": [HumanMessage(content="Help me plan a trip")]},
        version="v2",
    ):
        kind = event["event"]
        tags = event.get("tags", [])

        # Only stream tokens from the 'respond' node
        if kind == "on_chat_model_stream" and "respond_node" in tags:
            token = event["data"]["chunk"].content
            if token:
                print(token, end="", flush=True)
```

Tag your nodes to filter events effectively. Add tags when defining nodes:

```python
def respond(state: State) -> dict:
    response = llm.invoke(state["messages"])
    return {"messages": [response]}

builder.add_node("respond", respond, metadata={"tags": ["respond_node"]})
```

## Streaming Multiple Modes Simultaneously

You can combine stream modes to get both state updates and token streams:

```python
for event in graph.stream(
    {"messages": [HumanMessage(content="Analyze this data")]},
    stream_mode=["updates", "messages"],
):
    if isinstance(event, tuple):
        mode, data = event
        if mode == "messages":
            msg_chunk, metadata = data
            print(f"Token: {msg_chunk.content}", end="")
        elif mode == "updates":
            print(f"\nNode update: {data}")
```

This is particularly useful for building rich UIs that show both progress indicators for node transitions and streaming text for LLM output.

## Practical Streaming Pattern for Web APIs

Here is how to wire LangGraph streaming into a FastAPI server-sent events endpoint:

```python
from fastapi import FastAPI
from fastapi.responses import StreamingResponse

app = FastAPI()

async def event_generator(query: str):
    async for event in graph.astream_events(
        {"messages": [HumanMessage(content=query)]},
        version="v2",
    ):
        if event["event"] == "on_chat_model_stream":
            token = event["data"]["chunk"].content
            if token:
                yield f"data: {token}\n\n"
    yield "data: [DONE]\n\n"

@app.get("/stream")
async def stream_endpoint(q: str):
    return StreamingResponse(
        event_generator(q),
        media_type="text/event-stream",
    )
```

This lets frontend clients consume the agent's output in real time using standard SSE.

## FAQ

### What is the difference between stream() and astream_events()?

`stream()` emits state-level updates (after each node completes). `astream_events()` emits fine-grained events including individual LLM tokens, tool calls, and chain starts/ends. Use `stream()` for node-level progress and `astream_events()` for token-level output.

### Does streaming work with checkpointing?

Yes. Streaming and checkpointing are independent features. You can stream a checkpointed graph and state will be persisted at each node regardless of whether the output is streamed or collected.

### Can I stream from a graph running in LangGraph Cloud?

Yes. LangGraph Cloud exposes streaming endpoints that emit server-sent events. The client SDK provides methods to consume these streams, giving you the same streaming experience as local execution but with managed infrastructure.

---

#LangGraph #Streaming #RealTime #TokenStreaming #Python #AgenticAI #LearnAI #AIEngineering

---

Source: https://callsphere.ai/blog/langgraph-streaming-real-time-node-updates-token-streaming
