MCPServerStreamableHTTP: Connecting to Remote Tool Servers
Connect agents to remote MCP tool servers using MCPServerStreamableHTTP with authentication headers, timeout configuration, retry policies, tool caching, and production deployment patterns.
When to Use Streamable HTTP
MCPServerStdio works great when the tool server runs on the same machine as the agent. But in production, your tools often live on remote servers — a company API, a cloud service, a shared tool server accessible by multiple agents. MCPServerStreamableHTTP connects your agent to remote MCP servers over HTTP, with support for streaming responses, authentication, retries, and tool caching.
Use Streamable HTTP when:
- The MCP server runs on a different machine or in the cloud
- Multiple agents need to share the same tool server
- The tool server needs to scale independently from agents
- You need authentication, rate limiting, or other HTTP-layer features
Basic Configuration
from agents.mcp import MCPServerStreamableHTTP
server = MCPServerStreamableHTTP(
name="Remote Tools",
params={
"url": "https://tools.example.com/mcp",
},
)
The url points to the MCP endpoint on the remote server. The Streamable HTTP transport communicates using HTTP POST requests with JSON-RPC payloads and receives streaming responses via Server-Sent Events.
flowchart TD
START["MCPServerStreamableHTTP: Connecting to Remote Too…"] --> A
A["When to Use Streamable HTTP"]
A --> B
B["Basic Configuration"]
B --> C
C["Authentication with Headers"]
C --> D
D["Timeout and Retry Configuration"]
D --> E
E["Retry with Backoff"]
E --> F
F["Caching Tool Lists for Performance"]
F --> G
G["Building an Agent with Remote API Tools"]
G --> H
H["Building a Remote MCP Server"]
H --> DONE["Key Takeaways"]
style START fill:#4f46e5,stroke:#4338ca,color:#fff
style DONE fill:#059669,stroke:#047857,color:#fff
Authentication with Headers
Most remote MCP servers require authentication. Pass headers in the configuration:
import os
server = MCPServerStreamableHTTP(
name="Authenticated Tools",
params={
"url": "https://tools.example.com/mcp",
"headers": {
"Authorization": f"Bearer {os.environ['MCP_API_KEY']}",
"X-Org-Id": "org_12345",
},
},
)
For OAuth-based authentication where tokens expire:
class TokenRefreshingMCPServer:
"""Wrapper that refreshes auth tokens before connecting."""
def __init__(self, url: str, token_provider):
self.url = url
self.token_provider = token_provider
async def get_server(self) -> MCPServerStreamableHTTP:
token = await self.token_provider.get_valid_token()
return MCPServerStreamableHTTP(
name="OAuth Tools",
params={
"url": self.url,
"headers": {
"Authorization": f"Bearer {token}",
},
},
)
# Usage
token_provider = OAuthTokenProvider(
client_id="your_client_id",
client_secret="your_client_secret",
token_url="https://auth.example.com/token",
)
refreshing_server = TokenRefreshingMCPServer(
url="https://tools.example.com/mcp",
token_provider=token_provider,
)
server = await refreshing_server.get_server()
Timeout and Retry Configuration
Remote servers can be slow or temporarily unavailable. Configure timeouts and retries to handle this gracefully:
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
server = MCPServerStreamableHTTP(
name="Resilient Remote Tools",
params={
"url": "https://tools.example.com/mcp",
"headers": {
"Authorization": f"Bearer {os.environ['MCP_API_KEY']}",
},
"timeout": 30, # Connection timeout in seconds
"sse_read_timeout": 300, # SSE stream read timeout for long operations
},
)
The distinction between timeout and sse_read_timeout matters: timeout is the initial connection timeout, while sse_read_timeout controls how long to wait for streaming data. Long-running tools (like database migrations or file processing) need a generous sse_read_timeout.
Retry with Backoff
For production reliability, configure retry behavior:
flowchart TD
CENTER(("Core Concepts"))
CENTER --> N0["The MCP server runs on a different mach…"]
CENTER --> N1["Multiple agents need to share the same …"]
CENTER --> N2["The tool server needs to scale independ…"]
CENTER --> N3["You need authentication, rate limiting,…"]
CENTER --> N4["Health checks — Add a /health endpoint …"]
CENTER --> N5["Rate limiting — Implement per-client ra…"]
style CENTER fill:#4f46e5,stroke:#4338ca,color:#fff
from agents.mcp import MCPServerStreamableHTTP
server = MCPServerStreamableHTTP(
name="Production Tools",
params={
"url": "https://tools.example.com/mcp",
"headers": {"Authorization": f"Bearer {os.environ['MCP_API_KEY']}"},
},
# Client-side retry configuration
client_session_timeout_seconds=300,
)
For more control over retries, wrap the server connection with custom logic:
import asyncio
from typing import Optional
async def connect_with_retry(
server: MCPServerStreamableHTTP,
max_attempts: int = 3,
base_delay: float = 1.0,
) -> bool:
"""Connect to an MCP server with exponential backoff."""
for attempt in range(max_attempts):
try:
await server.connect()
return True
except Exception as e:
if attempt == max_attempts - 1:
raise
delay = base_delay * (2 ** attempt)
print(f"Connection attempt {attempt + 1} failed: {e}. Retrying in {delay}s...")
await asyncio.sleep(delay)
return False
Caching Tool Lists for Performance
Every time you enter the async with block, the client fetches the server's tool list. For servers with stable tool sets, this is redundant overhead. Enable caching:
server = MCPServerStreamableHTTP(
name="Cached Tools",
params={
"url": "https://tools.example.com/mcp",
"headers": {"Authorization": f"Bearer {os.environ['MCP_API_KEY']}"},
},
cache_tools_list=True, # Cache the tool list across connections
)
With cache_tools_list=True, the tool list is fetched once and reused on subsequent connections. This saves a round trip on every agent run. Disable caching only if the server's tools change frequently.
Building an Agent with Remote API Tools
Here is a complete example connecting to a remote CRM tools server:
import asyncio
from agents import Agent, Runner
from agents.mcp import MCPServerStreamableHTTP
async def main():
# Connect to a remote CRM tool server
crm_server = MCPServerStreamableHTTP(
name="CRM Tools",
params={
"url": "https://crm-tools.internal.company.com/mcp",
"headers": {
"Authorization": f"Bearer {os.environ['CRM_MCP_TOKEN']}",
"X-Team": "sales",
},
"timeout": 15,
"sse_read_timeout": 120,
},
cache_tools_list=True,
)
# Connect to a remote analytics server
analytics_server = MCPServerStreamableHTTP(
name="Analytics Tools",
params={
"url": "https://analytics-tools.internal.company.com/mcp",
"headers": {
"Authorization": f"Bearer {os.environ['ANALYTICS_MCP_TOKEN']}",
},
},
cache_tools_list=True,
)
async with crm_server, analytics_server:
agent = Agent(
name="Sales Intelligence Agent",
instructions="""You are a sales intelligence assistant with access
to CRM data and analytics tools.
Use CRM tools to look up contacts, deals, and account history.
Use analytics tools to pull pipeline metrics and forecasts.
Always cite specific data points when making recommendations.
Never guess — if you cannot find the data, say so.""",
mcp_servers=[crm_server, analytics_server],
)
result = await Runner.run(
agent,
input="What is the current pipeline value for Q2 and which deals are most at risk?",
)
print(result.final_output)
asyncio.run(main())
Building a Remote MCP Server
Here is how to build the server side using FastMCP with HTTP transport:
# crm_tools_server.py
from mcp.server.fastmcp import FastMCP
import asyncpg
mcp = FastMCP("CRM Tools")
db_pool = None
@mcp.tool()
async def search_contacts(query: str, limit: int = 10) -> str:
"""Search CRM contacts by name, email, or company."""
rows = await db_pool.fetch(
"""
SELECT name, email, company, deal_count, total_revenue
FROM contacts
WHERE name ILIKE $1 OR email ILIKE $1 OR company ILIKE $1
ORDER BY total_revenue DESC
LIMIT $2
""",
f"%{query}%",
limit,
)
if not rows:
return "No contacts found matching the query."
results = []
for r in rows:
results.append(
f"- {r['name']} ({r['email']}) at {r['company']}: "
f"{r['deal_count']} deals, ${r['total_revenue']:,.0f} revenue"
)
return "\n".join(results)
@mcp.tool()
async def get_pipeline_summary(quarter: str) -> str:
"""Get deal pipeline summary for a given quarter (e.g., 'Q2 2026')."""
rows = await db_pool.fetch(
"""
SELECT stage, COUNT(*) as deal_count, SUM(value) as total_value
FROM deals
WHERE quarter = $1
GROUP BY stage
ORDER BY total_value DESC
""",
quarter,
)
if not rows:
return f"No pipeline data found for {quarter}."
lines = [f"Pipeline for {quarter}:"]
for r in rows:
lines.append(
f" {r['stage']}: {r['deal_count']} deals, ${r['total_value']:,.0f}"
)
return "\n".join(lines)
if __name__ == "__main__":
import asyncio
async def setup():
global db_pool
db_pool = await asyncpg.create_pool(dsn="postgresql://user:pass@db:5432/crm")
mcp.run(transport="streamable-http", host="0.0.0.0", port=8080)
asyncio.run(setup())
Production Deployment Patterns
- Health checks — Add a
/healthendpoint to your MCP server for load balancer probes - Rate limiting — Implement per-client rate limits to prevent one agent from monopolizing resources
- Request logging — Log every tool invocation with trace IDs for debugging
- Circuit breaker — If the remote server fails repeatedly, stop trying and fall back gracefully
- mTLS — Use mutual TLS for service-to-service authentication in internal networks
- Connection pooling — Reuse HTTP connections across multiple agent runs
MCPServerStreamableHTTP is the production transport for multi-service architectures where tools live on dedicated servers.
Written by
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.