---
title: "Memory-Efficient Agent Design: Handling Long Conversations Without OOM"
description: "Design AI agents that handle long conversations gracefully by using streaming processing, incremental state management, garbage collection strategies, and memory limits to prevent out-of-memory crashes."
canonical: https://callsphere.ai/blog/memory-efficient-agent-design-long-conversations-without-oom
category: "Learn Agentic AI"
tags: ["Memory Management", "Streaming", "Scalability", "Production", "Python"]
author: "CallSphere Team"
published: 2026-03-17T00:00:00.000Z
updated: 2026-05-06T01:02:44.339Z
---

# Memory-Efficient Agent Design: Handling Long Conversations Without OOM

> Design AI agents that handle long conversations gracefully by using streaming processing, incremental state management, garbage collection strategies, and memory limits to prevent out-of-memory crashes.

## How Agent Memory Grows Out of Control

An AI agent conversation is not just a list of strings. Each turn includes the user message, assistant response, tool calls, tool results, and metadata. A single tool result can be 10KB of JSON. Over a 50-turn conversation with 3-5 tool calls per turn, the in-memory conversation state can exceed 500KB — per session.

Multiply that by hundreds of concurrent sessions and you have a server consuming gigabytes of RAM just for conversation state. Add in embedding vectors, cached results, and intermediate processing buffers, and out-of-memory (OOM) crashes become a real production risk.

## Streaming Processing: Never Hold the Full Response

When processing LLM responses, stream them instead of accumulating the entire response in memory before returning it.

```mermaid
sequenceDiagram
    autonumber
    participant Client
    participant Edge as Edge Worker
    participant LLM as LLM Provider
    participant DB as Logs and Trace
    Client->>Edge: POST /chat (stream=true)
    Edge->>LLM: messages.create(stream=true)
    loop Each token
        LLM-->>Edge: SSE chunk delta
        Edge-->>Client: SSE chunk delta
        Edge->>DB: append token to span
    end
    LLM-->>Edge: stop_reason=end_turn
    Edge-->>Client: event: done
    Edge->>DB: finalize trace
```

```python
from openai import AsyncOpenAI

client = AsyncOpenAI()

# BAD: Accumulates the entire response in memory
async def generate_full(messages: list[dict]) -> str:
    response = await client.chat.completions.create(
        model="gpt-4o", messages=messages,
    )
    return response.choices[0].message.content  # Full string in memory

# GOOD: Stream chunks to the client as they arrive
async def generate_streamed(messages: list[dict]):
    stream = await client.chat.completions.create(
        model="gpt-4o", messages=messages, stream=True,
    )
    async for chunk in stream:
        delta = chunk.choices[0].delta.content
        if delta:
            yield delta  # Yield each chunk, never hold the full response
```

For FastAPI, combine this with `StreamingResponse`:

```python
from fastapi import FastAPI
from fastapi.responses import StreamingResponse

app = FastAPI()

@app.post("/chat")
async def chat(request: ChatRequest):
    async def stream_generator():
        async for chunk in generate_streamed(request.messages):
            yield f"data: {chunk}\n\n"
        yield "data: [DONE]\n\n"

    return StreamingResponse(
        stream_generator(),
        media_type="text/event-stream",
    )
```

## Incremental State: Store Summaries, Not Full History

Instead of keeping every message in memory, maintain an incremental state that compresses old messages into summaries.

```python
from dataclasses import dataclass, field

@dataclass
class ConversationState:
    session_id: str
    summary: str = ""
    recent_messages: list[dict] = field(default_factory=list)
    max_recent: int = 10
    _total_turns: int = 0

    def add_message(self, message: dict):
        self.recent_messages.append(message)
        self._total_turns += 1

    def needs_compaction(self) -> bool:
        return len(self.recent_messages) > self.max_recent * 2

    async def compact(self, summarizer):
        """Compress old messages into the summary."""
        if not self.needs_compaction():
            return

        # Keep the last max_recent messages
        to_summarize = self.recent_messages[:-self.max_recent]
        self.recent_messages = self.recent_messages[-self.max_recent:]

        # Add to running summary
        new_summary = await summarizer.summarize(to_summarize)
        self.summary = f"{self.summary} {new_summary}".strip()

    def get_context(self) -> list[dict]:
        """Build the context for the LLM call."""
        context = []
        if self.summary:
            context.append({
                "role": "system",
                "content": f"Previous conversation summary: {self.summary}",
            })
        context.extend(self.recent_messages)
        return context

    @property
    def memory_estimate_bytes(self) -> int:
        """Rough estimate of memory consumed by this state."""
        summary_bytes = len(self.summary.encode("utf-8"))
        messages_bytes = sum(
            len(str(m).encode("utf-8")) for m in self.recent_messages
        )
        return summary_bytes + messages_bytes
```

## Session Memory Limits and Eviction

For multi-session servers, enforce per-session and global memory limits.

```python
import asyncio
from collections import OrderedDict

class SessionManager:
    def __init__(
        self,
        max_sessions: int = 1000,
        max_memory_bytes: int = 500 * 1024 * 1024,  # 500MB
    ):
        self.max_sessions = max_sessions
        self.max_memory_bytes = max_memory_bytes
        self._sessions: OrderedDict[str, ConversationState] = OrderedDict()
        self._lock = asyncio.Lock()

    async def get_or_create(self, session_id: str) -> ConversationState:
        async with self._lock:
            if session_id in self._sessions:
                self._sessions.move_to_end(session_id)
                return self._sessions[session_id]

            # Evict if at capacity
            await self._evict_if_needed()

            state = ConversationState(session_id=session_id)
            self._sessions[session_id] = state
            return state

    async def _evict_if_needed(self):
        # Evict by count
        while len(self._sessions) >= self.max_sessions:
            evicted_id, evicted_state = self._sessions.popitem(last=False)
            await self._persist_to_disk(evicted_id, evicted_state)

        # Evict by memory
        total_memory = sum(
            s.memory_estimate_bytes for s in self._sessions.values()
        )
        while total_memory > self.max_memory_bytes and self._sessions:
            evicted_id, evicted_state = self._sessions.popitem(last=False)
            total_memory -= evicted_state.memory_estimate_bytes
            await self._persist_to_disk(evicted_id, evicted_state)

    async def _persist_to_disk(self, session_id: str, state: ConversationState):
        """Save evicted session to database for later retrieval."""
        # Implementation: write to PostgreSQL, Redis, or file
        pass
```

## Truncating Tool Outputs Before Storage

Tool outputs are the single largest memory consumer. Truncate them before adding to conversation state.

```python
import json

class ToolOutputTruncator:
    def __init__(self, max_chars: int = 2000):
        self.max_chars = max_chars

    def truncate(self, output: str) -> str:
        if len(output)  str:
        if depth > 3:
            return '"...(nested)"'

        if isinstance(data, list):
            if len(data) > 5:
                truncated = data[:5]
                result = json.dumps(truncated, default=str)
                return result + f"\n...({len(data) - 5} more items)"
            return json.dumps(data, default=str)

        if isinstance(data, dict):
            # Keep only essential fields
            essential = {k: v for k, v in list(data.items())[:10]}
            return json.dumps(essential, default=str)

        return json.dumps(data, default=str)
```

## Monitoring Memory Usage

Add memory monitoring to detect leaks before they cause OOM crashes.

```python
import psutil
import os
import logging

logger = logging.getLogger(__name__)

class MemoryMonitor:
    def __init__(self, warning_pct: float = 75.0, critical_pct: float = 90.0):
        self.warning_pct = warning_pct
        self.critical_pct = critical_pct
        self.process = psutil.Process(os.getpid())

    def check(self) -> dict:
        mem = self.process.memory_info()
        system_mem = psutil.virtual_memory()

        usage_pct = (mem.rss / system_mem.total) * 100

        status = {
            "rss_mb": mem.rss / (1024 * 1024),
            "usage_pct": usage_pct,
            "status": "ok",
        }

        if usage_pct > self.critical_pct:
            status["status"] = "critical"
            logger.critical(f"Memory critical: {usage_pct:.1f}% of system RAM")
        elif usage_pct > self.warning_pct:
            status["status"] = "warning"
            logger.warning(f"Memory warning: {usage_pct:.1f}% of system RAM")

        return status
```

## FAQ

### How many concurrent agent sessions can a typical server handle?

With efficient memory management, a server with 4GB of RAM can handle 1,000-5,000 concurrent sessions depending on conversation length. Without optimization, the same server might OOM at 200 sessions. The key is keeping per-session memory under 500KB through summarization and tool output truncation.

### Should I use Redis or in-process memory for conversation state?

Use in-process memory for active sessions (fastest access) and Redis for idle sessions (shared across server instances). Implement an LRU eviction policy that moves inactive sessions from memory to Redis after a configurable idle timeout, typically 5-15 minutes.

### How do I detect memory leaks in a long-running agent service?

Track RSS (Resident Set Size) over time using `psutil`. If RSS grows monotonically even when session counts are stable, you have a leak. Common culprits are: accumulating references in global lists, not closing HTTP clients, and circular references in tool result objects that prevent garbage collection.

---

#MemoryManagement #Streaming #Scalability #Production #Python #AgenticAI #LearnAI #AIEngineering

---

Source: https://callsphere.ai/blog/memory-efficient-agent-design-long-conversations-without-oom
