---
title: "TypeScript Streaming Patterns: ReadableStream, AsyncIterator, and SSE for AI"
description: "Deep dive into TypeScript streaming patterns essential for AI applications. Learn ReadableStream construction, TransformStreams for processing, async iterators for consumption, Server-Sent Events for browser delivery, and backpressure handling."
canonical: https://callsphere.ai/blog/typescript-streaming-patterns-readablestream-asynciterator-sse-ai
category: "Learn Agentic AI"
tags: ["Streaming", "TypeScript", "ReadableStream", "SSE", "AsyncIterator", "Web Streams"]
author: "CallSphere Team"
published: 2026-03-17T00:00:00.000Z
updated: 2026-05-06T20:18:37.099Z
---

# TypeScript Streaming Patterns: ReadableStream, AsyncIterator, and SSE for AI

> Deep dive into TypeScript streaming patterns essential for AI applications. Learn ReadableStream construction, TransformStreams for processing, async iterators for consumption, Server-Sent Events for browser delivery, and backpressure handling.

## Why Streaming Matters for AI Applications

LLMs generate tokens sequentially, and a typical response takes 2-10 seconds to complete. Without streaming, users stare at a loading spinner for the entire duration. With streaming, the first token appears in under 200 milliseconds, creating a dramatically better user experience.

TypeScript's Web Streams API, async iterators, and Server-Sent Events provide the building blocks for end-to-end streaming from the LLM to the browser. Understanding these primitives lets you build custom streaming pipelines beyond what framework abstractions provide.

## ReadableStream: The Foundation

A `ReadableStream` is the standard way to represent a source of data that arrives over time. The Web Streams API is available in Node.js 18+, Deno, Bun, and all modern browsers.

```mermaid
sequenceDiagram
    autonumber
    participant Client
    participant Edge as Edge Worker
    participant LLM as LLM Provider
    participant DB as Logs and Trace
    Client->>Edge: POST /chat (stream=true)
    Edge->>LLM: messages.create(stream=true)
    loop Each token
        LLM-->>Edge: SSE chunk delta
        Edge-->>Client: SSE chunk delta
        Edge->>DB: append token to span
    end
    LLM-->>Edge: stop_reason=end_turn
    Edge-->>Client: event: done
    Edge->>DB: finalize trace
```

Construct a ReadableStream that emits LLM tokens:

```typescript
function createTokenStream(tokens: string[]): ReadableStream {
  let index = 0;

  return new ReadableStream({
    pull(controller) {
      if (index
): ReadableStream {
  const encoder = new TextEncoder();

  return new ReadableStream({
    async start(controller) {
      try {
        for await (const chunk of stream) {
          const text = chunk.choices[0]?.delta?.content;
          if (text) {
            controller.enqueue(encoder.encode(text));
          }
        }
        controller.close();
      } catch (error) {
        controller.error(error);
      }
    },
  });
}
```

## TransformStream: Processing in Flight

TransformStreams let you modify data as it flows through the pipeline. This is useful for formatting, filtering, or enriching tokens:

```typescript
function createSSETransform(): TransformStream {
  const encoder = new TextEncoder();

  return new TransformStream({
    transform(chunk, controller) {
      const data = JSON.stringify({ text: chunk, timestamp: Date.now() });
      controller.enqueue(encoder.encode(`data: ${data}

`));
    },
    flush(controller) {
      controller.enqueue(encoder.encode("data: [DONE]

"));
    },
  });
}

// Pipeline: LLM tokens -> SSE formatted events
const sseStream = tokenStream.pipeThrough(createSSETransform());
```

A more practical transform counts tokens as they flow through:

```typescript
function createTokenCounter(): TransformStream {
  let tokenCount = 0;

  return new TransformStream({
    transform(chunk, controller) {
      tokenCount += chunk.split(/s+/).length;
      controller.enqueue(chunk);
    },
    flush(controller) {
      console.log(`Stream complete. Approximate tokens: ${tokenCount}`);
    },
  });
}
```

## Async Iterators: Consuming Streams

Convert a ReadableStream into an async iterator for ergonomic consumption:

```typescript
async function* streamToAsyncIterator(
  stream: ReadableStream
): AsyncGenerator {
  const reader = stream.getReader();

  try {
    while (true) {
      const { done, value } = await reader.read();
      if (done) break;
      yield value;
    }
  } finally {
    reader.releaseLock();
  }
}

// Consume the stream
const stream = getAgentResponseStream();
for await (const token of streamToAsyncIterator(stream)) {
  process.stdout.write(token);
}
```

In Node.js 20+, `ReadableStream` implements `Symbol.asyncIterator` natively, so you can iterate directly:

```typescript
for await (const chunk of readableStream) {
  process.stdout.write(new TextDecoder().decode(chunk));
}
```

## Server-Sent Events: Browser Delivery

SSE is the simplest way to stream data from server to browser. It uses a plain HTTP connection with a specific content type:

```typescript
// Server: Next.js API route
export async function GET(req: Request) {
  const stream = await getAgentStream();

  const sseStream = new ReadableStream({
    async start(controller) {
      const encoder = new TextEncoder();

      for await (const token of stream) {
        const event = `data: ${JSON.stringify({ token })}

`;
        controller.enqueue(encoder.encode(event));
      }

      controller.enqueue(encoder.encode("data: [DONE]

"));
      controller.close();
    },
  });

  return new Response(sseStream, {
    headers: {
      "Content-Type": "text/event-stream",
      "Cache-Control": "no-cache, no-transform",
      Connection: "keep-alive",
    },
  });
}
```

Consume SSE on the client with `EventSource` or `fetch`:

```typescript
// Client: Browser
function streamAgentResponse(
  onToken: (token: string) => void,
  onDone: () => void
) {
  const eventSource = new EventSource("/api/agent/stream");

  eventSource.onmessage = (event) => {
    if (event.data === "[DONE]") {
      eventSource.close();
      onDone();
      return;
    }

    const { token } = JSON.parse(event.data);
    onToken(token);
  };

  eventSource.onerror = () => {
    eventSource.close();
  };
}
```

For POST requests (EventSource only supports GET), use fetch with a reader:

```typescript
async function fetchStream(messages: Message[]) {
  const response = await fetch("/api/agent", {
    method: "POST",
    headers: { "Content-Type": "application/json" },
    body: JSON.stringify({ messages }),
  });

  const reader = response.body!.getReader();
  const decoder = new TextDecoder();

  while (true) {
    const { done, value } = await reader.read();
    if (done) break;

    const text = decoder.decode(value, { stream: true });
    // Parse SSE events from text
    for (const line of text.split("\n")) {
      if (line.startsWith("data: ") && line !== "data: [DONE]") {
        const data = JSON.parse(line.slice(6));
        appendToken(data.token);
      }
    }
  }
}
```

## Backpressure Handling

When the client reads slower than the LLM produces tokens, backpressure prevents memory buildup:

```typescript
function createBackpressuredStream(
  source: AsyncIterable
): ReadableStream {
  const encoder = new TextEncoder();

  return new ReadableStream({
    async pull(controller) {
      // pull is only called when the consumer is ready
      const iterator = (this as any)._iterator ??= source[Symbol.asyncIterator]();
      const { done, value } = await iterator.next();

      if (done) {
        controller.close();
      } else {
        controller.enqueue(encoder.encode(value));
      }
    },
  });
}
```

The `pull`-based model ensures the LLM response is consumed at the rate the client can handle, preventing unbounded buffering.

## FAQ

### When should I use SSE versus WebSockets for AI streaming?

Use SSE for AI agent responses because the data flow is unidirectional (server to client). SSE is simpler, works over standard HTTP, reconnects automatically, and is supported by all browsers. WebSockets are better when you need bidirectional real-time communication, such as collaborative editing or voice streaming.

### Why not just use chunked transfer encoding without SSE framing?

Raw chunked encoding does not provide event boundaries. With SSE, each `data:` line is a discrete event that the client can parse independently. This matters when a single network chunk contains multiple partial tokens or when tokens span chunk boundaries.

### How do I handle stream errors gracefully on the client?

Monitor the `onerror` event on `EventSource` or catch errors on the fetch reader. Display a user-friendly message and optionally retry the request. For critical applications, implement a heartbeat mechanism — send a periodic `data: {"heartbeat": true}` event so the client can detect stale connections.

---

#Streaming #TypeScript #ReadableStream #SSE #AsyncIterator #WebStreams #AgenticAI #LearnAI #AIEngineering

---

Source: https://callsphere.ai/blog/typescript-streaming-patterns-readablestream-asynciterator-sse-ai
