---
title: "MCPServerStreamableHTTP: Connecting to Remote Tool Servers"
description: "Connect agents to remote MCP tool servers using MCPServerStreamableHTTP with authentication headers, timeout configuration, retry policies, tool caching, and production deployment patterns."
canonical: https://callsphere.ai/blog/mcp-server-streamable-http-remote-tool-servers
category: "Learn Agentic AI"
tags: ["OpenAI", "MCP", "HTTP", "Remote Tools", "API"]
author: "CallSphere Team"
published: 2026-03-14T00:00:00.000Z
updated: 2026-05-06T01:02:41.656Z
---

# MCPServerStreamableHTTP: Connecting to Remote Tool Servers

> Connect agents to remote MCP tool servers using MCPServerStreamableHTTP with authentication headers, timeout configuration, retry policies, tool caching, and production deployment patterns.

## When to Use Streamable HTTP

MCPServerStdio works great when the tool server runs on the same machine as the agent. But in production, your tools often live on remote servers — a company API, a cloud service, a shared tool server accessible by multiple agents. MCPServerStreamableHTTP connects your agent to remote MCP servers over HTTP, with support for streaming responses, authentication, retries, and tool caching.

Use Streamable HTTP when:

- The MCP server runs on a different machine or in the cloud
- Multiple agents need to share the same tool server
- The tool server needs to scale independently from agents
- You need authentication, rate limiting, or other HTTP-layer features

## Basic Configuration

```python
from agents.mcp import MCPServerStreamableHTTP

server = MCPServerStreamableHTTP(
    name="Remote Tools",
    params={
        "url": "https://tools.example.com/mcp",
    },
)
```

The `url` points to the MCP endpoint on the remote server. The Streamable HTTP transport communicates using HTTP POST requests with JSON-RPC payloads and receives streaming responses via Server-Sent Events.

```mermaid
flowchart LR
    HOST(["MCP host
Claude Desktop or IDE"])
    CLIENT["MCP client"]
    subgraph SERVERS["MCP Servers"]
        S1["Filesystem server"]
        S2["GitHub server"]
        S3["Postgres server"]
        SX["Custom tool server"]
    end
    LLM["LLM session"]
    OUT(["Grounded action"])
    HOST  CLIENT
    CLIENT |stdio or HTTP+SSE| S1
    CLIENT  S2
    CLIENT  S3
    CLIENT  SX
    CLIENT --> LLM --> OUT
    style HOST fill:#f1f5f9,stroke:#64748b,color:#0f172a
    style CLIENT fill:#4f46e5,stroke:#4338ca,color:#fff
    style OUT fill:#059669,stroke:#047857,color:#fff
```

## Authentication with Headers

Most remote MCP servers require authentication. Pass headers in the configuration:

```python
import os

server = MCPServerStreamableHTTP(
    name="Authenticated Tools",
    params={
        "url": "https://tools.example.com/mcp",
        "headers": {
            "Authorization": f"Bearer {os.environ['MCP_API_KEY']}",
            "X-Org-Id": "org_12345",
        },
    },
)
```

For OAuth-based authentication where tokens expire:

```python
class TokenRefreshingMCPServer:
    """Wrapper that refreshes auth tokens before connecting."""

    def __init__(self, url: str, token_provider):
        self.url = url
        self.token_provider = token_provider

    async def get_server(self) -> MCPServerStreamableHTTP:
        token = await self.token_provider.get_valid_token()
        return MCPServerStreamableHTTP(
            name="OAuth Tools",
            params={
                "url": self.url,
                "headers": {
                    "Authorization": f"Bearer {token}",
                },
            },
        )

# Usage
token_provider = OAuthTokenProvider(
    client_id="your_client_id",
    client_secret="your_client_secret",
    token_url="https://auth.example.com/token",
)

refreshing_server = TokenRefreshingMCPServer(
    url="https://tools.example.com/mcp",
    token_provider=token_provider,
)

server = await refreshing_server.get_server()
```

## Timeout and Retry Configuration

Remote servers can be slow or temporarily unavailable. Configure timeouts and retries to handle this gracefully:

```python
server = MCPServerStreamableHTTP(
    name="Resilient Remote Tools",
    params={
        "url": "https://tools.example.com/mcp",
        "headers": {
            "Authorization": f"Bearer {os.environ['MCP_API_KEY']}",
        },
        "timeout": 30,           # Connection timeout in seconds
        "sse_read_timeout": 300,  # SSE stream read timeout for long operations
    },
)
```

The distinction between `timeout` and `sse_read_timeout` matters: `timeout` is the initial connection timeout, while `sse_read_timeout` controls how long to wait for streaming data. Long-running tools (like database migrations or file processing) need a generous `sse_read_timeout`.

## Retry with Backoff

For production reliability, configure retry behavior:

```python
from agents.mcp import MCPServerStreamableHTTP

server = MCPServerStreamableHTTP(
    name="Production Tools",
    params={
        "url": "https://tools.example.com/mcp",
        "headers": {"Authorization": f"Bearer {os.environ['MCP_API_KEY']}"},
    },
    # Client-side retry configuration
    client_session_timeout_seconds=300,
)
```

For more control over retries, wrap the server connection with custom logic:

```python
import asyncio
from typing import Optional

async def connect_with_retry(
    server: MCPServerStreamableHTTP,
    max_attempts: int = 3,
    base_delay: float = 1.0,
) -> bool:
    """Connect to an MCP server with exponential backoff."""
    for attempt in range(max_attempts):
        try:
            await server.connect()
            return True
        except Exception as e:
            if attempt == max_attempts - 1:
                raise
            delay = base_delay * (2 ** attempt)
            print(f"Connection attempt {attempt + 1} failed: {e}. Retrying in {delay}s...")
            await asyncio.sleep(delay)
    return False
```

## Caching Tool Lists for Performance

Every time you enter the `async with` block, the client fetches the server's tool list. For servers with stable tool sets, this is redundant overhead. Enable caching:

```python
server = MCPServerStreamableHTTP(
    name="Cached Tools",
    params={
        "url": "https://tools.example.com/mcp",
        "headers": {"Authorization": f"Bearer {os.environ['MCP_API_KEY']}"},
    },
    cache_tools_list=True,  # Cache the tool list across connections
)
```

With `cache_tools_list=True`, the tool list is fetched once and reused on subsequent connections. This saves a round trip on every agent run. Disable caching only if the server's tools change frequently.

## Building an Agent with Remote API Tools

Here is a complete example connecting to a remote CRM tools server:

```python
import asyncio
from agents import Agent, Runner
from agents.mcp import MCPServerStreamableHTTP

async def main():
    # Connect to a remote CRM tool server
    crm_server = MCPServerStreamableHTTP(
        name="CRM Tools",
        params={
            "url": "https://crm-tools.internal.company.com/mcp",
            "headers": {
                "Authorization": f"Bearer {os.environ['CRM_MCP_TOKEN']}",
                "X-Team": "sales",
            },
            "timeout": 15,
            "sse_read_timeout": 120,
        },
        cache_tools_list=True,
    )

    # Connect to a remote analytics server
    analytics_server = MCPServerStreamableHTTP(
        name="Analytics Tools",
        params={
            "url": "https://analytics-tools.internal.company.com/mcp",
            "headers": {
                "Authorization": f"Bearer {os.environ['ANALYTICS_MCP_TOKEN']}",
            },
        },
        cache_tools_list=True,
    )

    async with crm_server, analytics_server:
        agent = Agent(
            name="Sales Intelligence Agent",
            instructions="""You are a sales intelligence assistant with access
            to CRM data and analytics tools.

            Use CRM tools to look up contacts, deals, and account history.
            Use analytics tools to pull pipeline metrics and forecasts.

            Always cite specific data points when making recommendations.
            Never guess — if you cannot find the data, say so.""",
            mcp_servers=[crm_server, analytics_server],
        )

        result = await Runner.run(
            agent,
            input="What is the current pipeline value for Q2 and which deals are most at risk?",
        )
        print(result.final_output)

asyncio.run(main())
```

## Building a Remote MCP Server

Here is how to build the server side using FastMCP with HTTP transport:

```python
# crm_tools_server.py
from mcp.server.fastmcp import FastMCP
import asyncpg

mcp = FastMCP("CRM Tools")
db_pool = None

@mcp.tool()
async def search_contacts(query: str, limit: int = 10) -> str:
    """Search CRM contacts by name, email, or company."""
    rows = await db_pool.fetch(
        """
        SELECT name, email, company, deal_count, total_revenue
        FROM contacts
        WHERE name ILIKE $1 OR email ILIKE $1 OR company ILIKE $1
        ORDER BY total_revenue DESC
        LIMIT $2
        """,
        f"%{query}%",
        limit,
    )
    if not rows:
        return "No contacts found matching the query."
    results = []
    for r in rows:
        results.append(
            f"- {r['name']} ({r['email']}) at {r['company']}: "
            f"{r['deal_count']} deals, ${r['total_revenue']:,.0f} revenue"
        )
    return "\n".join(results)

@mcp.tool()
async def get_pipeline_summary(quarter: str) -> str:
    """Get deal pipeline summary for a given quarter (e.g., 'Q2 2026')."""
    rows = await db_pool.fetch(
        """
        SELECT stage, COUNT(*) as deal_count, SUM(value) as total_value
        FROM deals
        WHERE quarter = $1
        GROUP BY stage
        ORDER BY total_value DESC
        """,
        quarter,
    )
    if not rows:
        return f"No pipeline data found for {quarter}."
    lines = [f"Pipeline for {quarter}:"]
    for r in rows:
        lines.append(
            f"  {r['stage']}: {r['deal_count']} deals, ${r['total_value']:,.0f}"
        )
    return "\n".join(lines)

if __name__ == "__main__":
    import asyncio

    async def setup():
        global db_pool
        db_pool = await asyncpg.create_pool(dsn="postgresql://user:pass@db:5432/crm")
        mcp.run(transport="streamable-http", host="0.0.0.0", port=8080)

    asyncio.run(setup())
```

## Production Deployment Patterns

1. **Health checks** — Add a `/health` endpoint to your MCP server for load balancer probes
2. **Rate limiting** — Implement per-client rate limits to prevent one agent from monopolizing resources
3. **Request logging** — Log every tool invocation with trace IDs for debugging
4. **Circuit breaker** — If the remote server fails repeatedly, stop trying and fall back gracefully
5. **mTLS** — Use mutual TLS for service-to-service authentication in internal networks
6. **Connection pooling** — Reuse HTTP connections across multiple agent runs

MCPServerStreamableHTTP is the production transport for multi-service architectures where tools live on dedicated servers.

---

Source: https://callsphere.ai/blog/mcp-server-streamable-http-remote-tool-servers
