Skip to content
Learn Agentic AI
Learn Agentic AI12 min read6 views

MCPServerStreamableHTTP: Connecting to Remote Tool Servers

Connect agents to remote MCP tool servers using MCPServerStreamableHTTP with authentication headers, timeout configuration, retry policies, tool caching, and production deployment patterns.

When to Use Streamable HTTP

MCPServerStdio works great when the tool server runs on the same machine as the agent. But in production, your tools often live on remote servers — a company API, a cloud service, a shared tool server accessible by multiple agents. MCPServerStreamableHTTP connects your agent to remote MCP servers over HTTP, with support for streaming responses, authentication, retries, and tool caching.

Use Streamable HTTP when:

  • The MCP server runs on a different machine or in the cloud
  • Multiple agents need to share the same tool server
  • The tool server needs to scale independently from agents
  • You need authentication, rate limiting, or other HTTP-layer features

Basic Configuration

from agents.mcp import MCPServerStreamableHTTP

server = MCPServerStreamableHTTP(
    name="Remote Tools",
    params={
        "url": "https://tools.example.com/mcp",
    },
)

The url points to the MCP endpoint on the remote server. The Streamable HTTP transport communicates using HTTP POST requests with JSON-RPC payloads and receives streaming responses via Server-Sent Events.

flowchart TD
    START["MCPServerStreamableHTTP: Connecting to Remote Too…"] --> A
    A["When to Use Streamable HTTP"]
    A --> B
    B["Basic Configuration"]
    B --> C
    C["Authentication with Headers"]
    C --> D
    D["Timeout and Retry Configuration"]
    D --> E
    E["Retry with Backoff"]
    E --> F
    F["Caching Tool Lists for Performance"]
    F --> G
    G["Building an Agent with Remote API Tools"]
    G --> H
    H["Building a Remote MCP Server"]
    H --> DONE["Key Takeaways"]
    style START fill:#4f46e5,stroke:#4338ca,color:#fff
    style DONE fill:#059669,stroke:#047857,color:#fff

Authentication with Headers

Most remote MCP servers require authentication. Pass headers in the configuration:

import os

server = MCPServerStreamableHTTP(
    name="Authenticated Tools",
    params={
        "url": "https://tools.example.com/mcp",
        "headers": {
            "Authorization": f"Bearer {os.environ['MCP_API_KEY']}",
            "X-Org-Id": "org_12345",
        },
    },
)

For OAuth-based authentication where tokens expire:

class TokenRefreshingMCPServer:
    """Wrapper that refreshes auth tokens before connecting."""

    def __init__(self, url: str, token_provider):
        self.url = url
        self.token_provider = token_provider

    async def get_server(self) -> MCPServerStreamableHTTP:
        token = await self.token_provider.get_valid_token()
        return MCPServerStreamableHTTP(
            name="OAuth Tools",
            params={
                "url": self.url,
                "headers": {
                    "Authorization": f"Bearer {token}",
                },
            },
        )

# Usage
token_provider = OAuthTokenProvider(
    client_id="your_client_id",
    client_secret="your_client_secret",
    token_url="https://auth.example.com/token",
)

refreshing_server = TokenRefreshingMCPServer(
    url="https://tools.example.com/mcp",
    token_provider=token_provider,
)

server = await refreshing_server.get_server()

Timeout and Retry Configuration

Remote servers can be slow or temporarily unavailable. Configure timeouts and retries to handle this gracefully:

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

server = MCPServerStreamableHTTP(
    name="Resilient Remote Tools",
    params={
        "url": "https://tools.example.com/mcp",
        "headers": {
            "Authorization": f"Bearer {os.environ['MCP_API_KEY']}",
        },
        "timeout": 30,           # Connection timeout in seconds
        "sse_read_timeout": 300,  # SSE stream read timeout for long operations
    },
)

The distinction between timeout and sse_read_timeout matters: timeout is the initial connection timeout, while sse_read_timeout controls how long to wait for streaming data. Long-running tools (like database migrations or file processing) need a generous sse_read_timeout.

Retry with Backoff

For production reliability, configure retry behavior:

flowchart TD
    CENTER(("Core Concepts"))
    CENTER --> N0["The MCP server runs on a different mach…"]
    CENTER --> N1["Multiple agents need to share the same …"]
    CENTER --> N2["The tool server needs to scale independ…"]
    CENTER --> N3["You need authentication, rate limiting,…"]
    CENTER --> N4["Health checks — Add a /health endpoint …"]
    CENTER --> N5["Rate limiting — Implement per-client ra…"]
    style CENTER fill:#4f46e5,stroke:#4338ca,color:#fff
from agents.mcp import MCPServerStreamableHTTP

server = MCPServerStreamableHTTP(
    name="Production Tools",
    params={
        "url": "https://tools.example.com/mcp",
        "headers": {"Authorization": f"Bearer {os.environ['MCP_API_KEY']}"},
    },
    # Client-side retry configuration
    client_session_timeout_seconds=300,
)

For more control over retries, wrap the server connection with custom logic:

import asyncio
from typing import Optional

async def connect_with_retry(
    server: MCPServerStreamableHTTP,
    max_attempts: int = 3,
    base_delay: float = 1.0,
) -> bool:
    """Connect to an MCP server with exponential backoff."""
    for attempt in range(max_attempts):
        try:
            await server.connect()
            return True
        except Exception as e:
            if attempt == max_attempts - 1:
                raise
            delay = base_delay * (2 ** attempt)
            print(f"Connection attempt {attempt + 1} failed: {e}. Retrying in {delay}s...")
            await asyncio.sleep(delay)
    return False

Caching Tool Lists for Performance

Every time you enter the async with block, the client fetches the server's tool list. For servers with stable tool sets, this is redundant overhead. Enable caching:

server = MCPServerStreamableHTTP(
    name="Cached Tools",
    params={
        "url": "https://tools.example.com/mcp",
        "headers": {"Authorization": f"Bearer {os.environ['MCP_API_KEY']}"},
    },
    cache_tools_list=True,  # Cache the tool list across connections
)

With cache_tools_list=True, the tool list is fetched once and reused on subsequent connections. This saves a round trip on every agent run. Disable caching only if the server's tools change frequently.

Building an Agent with Remote API Tools

Here is a complete example connecting to a remote CRM tools server:

import asyncio
from agents import Agent, Runner
from agents.mcp import MCPServerStreamableHTTP

async def main():
    # Connect to a remote CRM tool server
    crm_server = MCPServerStreamableHTTP(
        name="CRM Tools",
        params={
            "url": "https://crm-tools.internal.company.com/mcp",
            "headers": {
                "Authorization": f"Bearer {os.environ['CRM_MCP_TOKEN']}",
                "X-Team": "sales",
            },
            "timeout": 15,
            "sse_read_timeout": 120,
        },
        cache_tools_list=True,
    )

    # Connect to a remote analytics server
    analytics_server = MCPServerStreamableHTTP(
        name="Analytics Tools",
        params={
            "url": "https://analytics-tools.internal.company.com/mcp",
            "headers": {
                "Authorization": f"Bearer {os.environ['ANALYTICS_MCP_TOKEN']}",
            },
        },
        cache_tools_list=True,
    )

    async with crm_server, analytics_server:
        agent = Agent(
            name="Sales Intelligence Agent",
            instructions="""You are a sales intelligence assistant with access
            to CRM data and analytics tools.

            Use CRM tools to look up contacts, deals, and account history.
            Use analytics tools to pull pipeline metrics and forecasts.

            Always cite specific data points when making recommendations.
            Never guess — if you cannot find the data, say so.""",
            mcp_servers=[crm_server, analytics_server],
        )

        result = await Runner.run(
            agent,
            input="What is the current pipeline value for Q2 and which deals are most at risk?",
        )
        print(result.final_output)

asyncio.run(main())

Building a Remote MCP Server

Here is how to build the server side using FastMCP with HTTP transport:

# crm_tools_server.py
from mcp.server.fastmcp import FastMCP
import asyncpg

mcp = FastMCP("CRM Tools")
db_pool = None

@mcp.tool()
async def search_contacts(query: str, limit: int = 10) -> str:
    """Search CRM contacts by name, email, or company."""
    rows = await db_pool.fetch(
        """
        SELECT name, email, company, deal_count, total_revenue
        FROM contacts
        WHERE name ILIKE $1 OR email ILIKE $1 OR company ILIKE $1
        ORDER BY total_revenue DESC
        LIMIT $2
        """,
        f"%{query}%",
        limit,
    )
    if not rows:
        return "No contacts found matching the query."
    results = []
    for r in rows:
        results.append(
            f"- {r['name']} ({r['email']}) at {r['company']}: "
            f"{r['deal_count']} deals, ${r['total_revenue']:,.0f} revenue"
        )
    return "\n".join(results)

@mcp.tool()
async def get_pipeline_summary(quarter: str) -> str:
    """Get deal pipeline summary for a given quarter (e.g., 'Q2 2026')."""
    rows = await db_pool.fetch(
        """
        SELECT stage, COUNT(*) as deal_count, SUM(value) as total_value
        FROM deals
        WHERE quarter = $1
        GROUP BY stage
        ORDER BY total_value DESC
        """,
        quarter,
    )
    if not rows:
        return f"No pipeline data found for {quarter}."
    lines = [f"Pipeline for {quarter}:"]
    for r in rows:
        lines.append(
            f"  {r['stage']}: {r['deal_count']} deals, ${r['total_value']:,.0f}"
        )
    return "\n".join(lines)

if __name__ == "__main__":
    import asyncio

    async def setup():
        global db_pool
        db_pool = await asyncpg.create_pool(dsn="postgresql://user:pass@db:5432/crm")
        mcp.run(transport="streamable-http", host="0.0.0.0", port=8080)

    asyncio.run(setup())

Production Deployment Patterns

  1. Health checks — Add a /health endpoint to your MCP server for load balancer probes
  2. Rate limiting — Implement per-client rate limits to prevent one agent from monopolizing resources
  3. Request logging — Log every tool invocation with trace IDs for debugging
  4. Circuit breaker — If the remote server fails repeatedly, stop trying and fall back gracefully
  5. mTLS — Use mutual TLS for service-to-service authentication in internal networks
  6. Connection pooling — Reuse HTTP connections across multiple agent runs

MCPServerStreamableHTTP is the production transport for multi-service architectures where tools live on dedicated servers.

Share
C

Written by

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.

Related Articles You May Like

Technical Guides

How AI Voice Agents Actually Work: Technical Deep Dive (2026 Edition)

A full technical walkthrough of how modern AI voice agents work — speech-to-text, LLM orchestration, TTS, tool calling, and sub-second latency.

Technical Guides

Building Voice Agents with the OpenAI Realtime API: Full Tutorial

Hands-on tutorial for building voice agents with the OpenAI Realtime API — WebSocket setup, PCM16 audio, server VAD, and function calling.

Technical Guides

Voice AI Latency: Why Sub-Second Response Time Matters (And How to Hit It)

A technical breakdown of voice AI latency budgets — STT, LLM, TTS, network — and how to hit sub-second end-to-end response times.

AI Interview Prep

8 AI System Design Interview Questions Actually Asked at FAANG in 2026

Real AI system design interview questions from Google, Meta, OpenAI, and Anthropic. Covers LLM serving, RAG pipelines, recommendation systems, AI agents, and more — with detailed answer frameworks.

AI Interview Prep

8 LLM & RAG Interview Questions That OpenAI, Anthropic & Google Actually Ask

Real LLM and RAG interview questions from top AI labs in 2026. Covers fine-tuning vs RAG decisions, production RAG pipelines, evaluation, PEFT methods, positional embeddings, and safety guardrails with expert answers.

AI Interview Prep

7 ML Fundamentals Questions That Top AI Companies Still Ask in 2026

Real machine learning fundamentals interview questions from OpenAI, Google DeepMind, Meta, and xAI in 2026. Covers attention mechanisms, KV cache, distributed training, MoE, speculative decoding, and emerging architectures.