Agent-to-Agent Protocol Design: Building Interoperable Multi-Agent Communication
Design robust communication protocols for multi-agent systems including message schemas, capability advertisement, negotiation protocols, and service discovery mechanisms with practical Python implementations.
The Interoperability Problem in Multi-Agent Systems
When you build a single agent, communication is straightforward — the agent calls tools and returns results. When you build a team of agents, you face a fundamental question: how do agents talk to each other? Without a well-designed protocol, you end up with tightly coupled agents that can only work with the specific partners they were built for.
Agent-to-agent (A2A) protocol design solves this by establishing standard message formats, capability discovery, and negotiation patterns that allow any agent to communicate with any other agent — even agents built by different teams or frameworks.
Designing the Message Schema
Every inter-agent message needs a consistent structure. Here is a protocol envelope that supports request-response, streaming, and event patterns.
flowchart TD
START["Agent-to-Agent Protocol Design: Building Interope…"] --> A
A["The Interoperability Problem in Multi-A…"]
A --> B
B["Designing the Message Schema"]
B --> C
C["Capability Advertisement and Discovery"]
C --> D
D["Negotiation Protocol"]
D --> E
E["Message Bus Implementation"]
E --> F
F["Putting It Together"]
F --> G
G["FAQ"]
G --> DONE["Key Takeaways"]
style START fill:#4f46e5,stroke:#4338ca,color:#fff
style DONE fill:#059669,stroke:#047857,color:#fff
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Dict, Optional
import uuid
import time
class MessageType(Enum):
REQUEST = "request"
RESPONSE = "response"
EVENT = "event"
CAPABILITY_QUERY = "capability_query"
CAPABILITY_ADVERTISEMENT = "capability_advertisement"
NEGOTIATION = "negotiation"
@dataclass
class AgentMessage:
msg_type: MessageType
sender_id: str
receiver_id: str
payload: Dict[str, Any]
correlation_id: str = field(default_factory=lambda: str(uuid.uuid4()))
reply_to: Optional[str] = None
timestamp: float = field(default_factory=time.time)
ttl_seconds: float = 30.0
protocol_version: str = "1.0"
@property
def is_expired(self) -> bool:
return (time.time() - self.timestamp) > self.ttl_seconds
def create_reply(self, payload: Dict[str, Any]) -> "AgentMessage":
return AgentMessage(
msg_type=MessageType.RESPONSE,
sender_id=self.receiver_id,
receiver_id=self.sender_id,
payload=payload,
reply_to=self.correlation_id,
)
The correlation_id ties requests to responses. The reply_to field enables threading. The ttl_seconds prevents stale messages from clogging the system.
Capability Advertisement and Discovery
Agents need to discover what other agents can do. A capability registry lets agents advertise their skills and query for agents that match specific needs.
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
from dataclasses import dataclass
from typing import Dict, List, Set
@dataclass
class AgentCapability:
name: str
description: str
input_schema: Dict # JSON Schema for expected input
output_schema: Dict # JSON Schema for output
cost_estimate: float # Relative cost (0.0 to 1.0)
latency_ms: int # Expected latency
class CapabilityRegistry:
def __init__(self):
self._capabilities: Dict[str, List[AgentCapability]] = {}
def register(self, agent_id: str, capabilities: List[AgentCapability]):
self._capabilities[agent_id] = capabilities
def unregister(self, agent_id: str):
self._capabilities.pop(agent_id, None)
def find_agents(
self, capability_name: str, max_cost: float = 1.0
) -> List[str]:
results = []
for agent_id, caps in self._capabilities.items():
for cap in caps:
if (
cap.name == capability_name
and cap.cost_estimate <= max_cost
):
results.append(agent_id)
return results
def get_capabilities(self, agent_id: str) -> List[AgentCapability]:
return self._capabilities.get(agent_id, [])
Negotiation Protocol
When multiple agents can handle a task, the requesting agent negotiates to find the best match. This implements a simple contract-net protocol.
import asyncio
class NegotiationProtocol:
def __init__(self, registry: CapabilityRegistry, bus: "MessageBus"):
self.registry = registry
self.bus = bus
async def request_bids(
self,
requester_id: str,
capability_name: str,
task_payload: Dict,
timeout: float = 5.0,
) -> List[Dict]:
candidates = self.registry.find_agents(capability_name)
if not candidates:
return []
bids = []
bid_requests = []
for agent_id in candidates:
msg = AgentMessage(
msg_type=MessageType.NEGOTIATION,
sender_id=requester_id,
receiver_id=agent_id,
payload={
"action": "request_bid",
"capability": capability_name,
"task": task_payload,
},
)
bid_requests.append(self.bus.send_and_wait(msg, timeout))
results = await asyncio.gather(
*bid_requests, return_exceptions=True
)
for result in results:
if isinstance(result, AgentMessage):
bids.append(result.payload)
return bids
def select_winner(self, bids: List[Dict]) -> Optional[Dict]:
valid = [b for b in bids if b.get("accepted")]
if not valid:
return None
return min(valid, key=lambda b: b.get("cost", float("inf")))
Message Bus Implementation
The message bus routes messages between agents and supports both direct addressing and publish-subscribe patterns.
class MessageBus:
def __init__(self):
self._handlers: Dict[str, asyncio.Queue] = {}
def register_agent(self, agent_id: str):
self._handlers[agent_id] = asyncio.Queue()
async def send(self, message: AgentMessage):
queue = self._handlers.get(message.receiver_id)
if queue:
await queue.put(message)
async def send_and_wait(
self, message: AgentMessage, timeout: float = 10.0
) -> Optional[AgentMessage]:
await self.send(message)
queue = self._handlers.get(message.sender_id)
if not queue:
return None
try:
while True:
reply = await asyncio.wait_for(queue.get(), timeout)
if reply.reply_to == message.correlation_id:
return reply
except asyncio.TimeoutError:
return None
async def receive(
self, agent_id: str, timeout: float = 5.0
) -> Optional[AgentMessage]:
queue = self._handlers.get(agent_id)
if not queue:
return None
try:
return await asyncio.wait_for(queue.get(), timeout)
except asyncio.TimeoutError:
return None
Putting It Together
async def demo():
bus = MessageBus()
registry = CapabilityRegistry()
# Register agents and their capabilities
bus.register_agent("summarizer")
registry.register("summarizer", [
AgentCapability("summarize", "Summarize text", {}, {}, 0.3, 2000)
])
bus.register_agent("translator")
registry.register("translator", [
AgentCapability("translate", "Translate text", {}, {}, 0.5, 3000)
])
# Discover and negotiate
negotiator = NegotiationProtocol(registry, bus)
agents = registry.find_agents("summarize")
print(f"Agents with summarize capability: {agents}")
FAQ
Why not just use HTTP REST between agents?
HTTP REST works for simple request-response patterns but lacks built-in support for capability discovery, negotiation, and message correlation. A dedicated agent protocol gives you these features plus TTL-based expiration and structured negotiation — reducing the boilerplate each agent must implement.
How does this compare to the A2A protocol from Google?
Google's Agent-to-Agent protocol focuses on web-standard interoperability using JSON-RPC over HTTP with agent cards for discovery. The patterns in this article follow similar principles but are designed for in-process or single-cluster deployments. For cross-organization interoperability, adopt the A2A standard; for internal agent teams, a lightweight custom protocol often performs better.
#AgentProtocol #A2ACommunication #MultiAgentSystems #AgentInteroperability #ProtocolDesign #AgenticAI #PythonAI #DistributedAgents
Written by
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.