Agent-to-Agent Communication Protocols: Technical Deep Dive
Explore messaging patterns, state sharing, and coordination protocols that enable reliable multi-agent collaboration -from simple handoffs to complex negotiations.
Explore messaging patterns, state sharing, and coordination protocols that enable reliable multi-agent collaboration -from simple handoffs to complex negotiations.
TL;DR
Jump to Why protocols matter · Jump to Direct messaging · Jump to Shared state · Jump to Pub/sub · Jump to Negotiation
Two agents need to collaborate. Agent A researches a topic, Agent B writes a summary. Simple, right?
Agent A finishes research. Sends data to Agent B. Agent B crashes mid-processing. Now what?
Traditional function calls don't handle these scenarios. You need communication protocols -rules governing how agents exchange messages, handle failures, and coordinate state.
This guide covers the four communication patterns that handle 95%+ of multi-agent scenarios, with implementation examples and failure handling strategies.
"The hardest part of building multi-agent systems isn't the agents -it's making them talk to each other reliably." – Andrej Karpathy, OpenAI (conference talk, 2024)
Most teams start here:
# Agent A
result_a = agent_a.run(input_data)
# Agent B
result_b = agent_b.run(result_a) # Just pass the data
This works... until it doesn't.
Failures not handled:
In our production systems, naive agent handoffs failed 14.2% of the time before we implemented proper protocols.
1. Acknowledgment: Agent B confirms it received the message 2. Retry logic: Agent A resends if acknowledgment times out 3. Idempotency: Agent B handles duplicate messages gracefully 4. Error propagation: Failures bubble up with context 5. State consistency: Both agents agree on current state
With protocols: failure rate dropped to 1.8% (mostly external API failures outside our control).
Use case: One agent sends data to another specific agent.
Example: Research agent → Analysis agent → Reporting agent
import uuid
from dataclasses import dataclass
from typing import Any, Optional
from enum import Enum
class MessageStatus(Enum):
PENDING = "pending"
ACKNOWLEDGED = "acknowledged"
COMPLETED = "completed"
FAILED = "failed"
@dataclass
class Message:
"""Structured message between agents."""
id: str
from_agent: str
to_agent: str
payload: dict
status: MessageStatus
timestamp: str
retry_count: int = 0
correlation_id: Optional[str] = None # Links related messages
class MessageBroker:
"""Handles message delivery between agents."""
def __init__(self):
self.message_queue = {}
self.delivery_callbacks = {}
def send(self, from_agent: str, to_agent: str, payload: dict) -> Message:
"""Send message from one agent to another."""
message = Message(
id=str(uuid.uuid4()),
from_agent=from_agent,
to_agent=to_agent,
payload=payload,
status=MessageStatus.PENDING,
timestamp=datetime.utcnow().isoformat()
)
# Store message
self.message_queue[message.id] = message
# Attempt delivery
self._deliver(message)
return message
def _deliver(self, message: Message):
"""Deliver message to target agent."""
try:
# Get agent's message handler
handler = self.delivery_callbacks.get(message.to_agent)
if not handler:
raise ValueError(f"No handler registered for {message.to_agent}")
# Deliver message
handler(message)
# Mark as acknowledged
message.status = MessageStatus.ACKNOWLEDGED
except Exception as e:
message.status = MessageStatus.FAILED
logging.error(f"Message delivery failed: {e}")
# Retry logic
if message.retry_count < 3:
message.retry_count += 1
self._schedule_retry(message)
def _schedule_retry(self, message: Message):
"""Retry failed delivery."""
# Exponential backoff
delay = 2 ** message.retry_count # 2s, 4s, 8s
time.sleep(delay)
self._deliver(message)
def register_handler(self, agent_name: str, handler: callable):
"""Register agent's message handler."""
self.delivery_callbacks[agent_name] = handler
# Usage
broker = MessageBroker()
class ResearchAgent:
def __init__(self, broker):
self.name = "research_agent"
self.broker = broker
broker.register_handler(self.name, self.handle_message)
def run(self, query: str):
# Do research
results = self.research(query)
# Send to analysis agent
message = self.broker.send(
from_agent=self.name,
to_agent="analysis_agent",
payload={"research_results": results, "query": query}
)
return message.id
def handle_message(self, message: Message):
"""Handle incoming messages (if any)."""
pass
class AnalysisAgent:
def __init__(self, broker):
self.name = "analysis_agent"
self.broker = broker
broker.register_handler(self.name, self.handle_message)
def handle_message(self, message: Message):
"""Process incoming research results."""
try:
results = message.payload["research_results"]
# Perform analysis
analysis = self.analyze(results)
# Send acknowledgment
self.broker.send(
from_agent=self.name,
to_agent=message.from_agent,
payload={"status": "completed", "analysis": analysis},
correlation_id=message.id # Link to original message
)
except Exception as e:
# Send failure notification
self.broker.send(
from_agent=self.name,
to_agent=message.from_agent,
payload={"status": "failed", "error": str(e)},
correlation_id=message.id
)
1. Message IDs: Every message has unique ID for tracking 2. Status tracking: Know if message was delivered, processed, or failed 3. Retry logic: Automatic retries with exponential backoff 4. Correlation IDs: Link requests and responses 5. Error propagation: Failures notify sender with context
Synchronous: Sender waits for receiver to finish processing
def send_sync(self, to_agent: str, payload: dict, timeout: int = 30):
"""Send message and wait for completion."""
message = self.send(to_agent, payload)
# Wait for response
start_time = time.time()
while time.time() - start_time < timeout:
if message.status == MessageStatus.COMPLETED:
return message
elif message.status == MessageStatus.FAILED:
raise Exception(f"Message failed: {message.error}")
time.sleep(0.1)
raise TimeoutError(f"Message {message.id} timed out")
Asynchronous: Sender continues immediately, receiver processes when ready
def send_async(self, to_agent: str, payload: dict, callback: callable = None):
"""Send message without waiting."""
message = self.send(to_agent, payload)
if callback:
# Register callback for when processing completes
self.completion_callbacks[message.id] = callback
return message.id # Return immediately
Recommendation: Use async for long-running operations (>5 seconds), sync for quick exchanges.
Use case: Multiple agents collaborate on same task, need consistent view of progress.
Example: Three agents writing different sections of a report, one agent compiling final document.
from threading import Lock
from typing import Dict, Any
class SharedState:
"""Thread-safe shared state for agent collaboration."""
def __init__(self):
self.state: Dict[str, Any] = {}
self.locks: Dict[str, Lock] = {}
self.version: int = 0
def get(self, key: str, default=None):
"""Get value from shared state."""
return self.state.get(key, default)
def set(self, key: str, value: Any, agent_id: str):
"""Set value in shared state with optimistic locking."""
# Create lock for this key if doesn't exist
if key not in self.locks:
self.locks[key] = Lock()
with self.locks[key]:
# Check version to detect conflicts
current_version = self.state.get(f"{key}__version", 0)
# Update state
self.state[key] = value
self.state[f"{key}__version"] = current_version + 1
self.state[f"{key}__updated_by"] = agent_id
self.state[f"{key}__updated_at"] = datetime.utcnow().isoformat()
# Increment global version
self.version += 1
logging.info(f"[SharedState] {agent_id} updated {key} (v{current_version + 1})")
def compare_and_swap(self, key: str, expected_value: Any, new_value: Any, agent_id: str) -> bool:
"""Atomic compare-and-swap operation."""
with self.locks.get(key, Lock()):
current = self.state.get(key)
if current == expected_value:
self.set(key, new_value, agent_id)
return True
else:
return False # Value changed, operation failed
def get_all(self) -> dict:
"""Get snapshot of entire state."""
return self.state.copy()
# Usage: Collaborative report writing
class ReportAgent:
def __init__(self, agent_id: str, shared_state: SharedState):
self.agent_id = agent_id
self.shared_state = shared_state
def write_section(self, section_name: str, content: str):
"""Write a section of the report."""
# Check if section already being written
current_writer = self.shared_state.get(f"{section_name}__writer")
if current_writer and current_writer != self.agent_id:
raise Exception(f"Section {section_name} is being written by {current_writer}")
# Claim section
self.shared_state.set(f"{section_name}__writer", self.agent_id, self.agent_id)
# Write content
self.shared_state.set(section_name, content, self.agent_id)
# Mark section complete
self.shared_state.set(f"{section_name}__status", "complete", self.agent_id)
# Release claim
self.shared_state.set(f"{section_name}__writer", None, self.agent_id)
class CompilerAgent:
def __init__(self, shared_state: SharedState):
self.shared_state = shared_state
def compile_report(self, section_names: list):
"""Compile final report from all sections."""
# Wait for all sections to complete
while not self._all_sections_complete(section_names):
time.sleep(1)
# Collect all sections
sections = [self.shared_state.get(name) for name in section_names]
# Compile final report
final_report = "\n\n".join(sections)
# Store final report
self.shared_state.set("final_report", final_report, "compiler_agent")
return final_report
def _all_sections_complete(self, section_names: list) -> bool:
"""Check if all sections are complete."""
for name in section_names:
status = self.shared_state.get(f"{name}__status")
if status != "complete":
return False
return True
When multiple agents update the same state simultaneously:
1. Last-write-wins (simple, lossy)
# No conflict detection, latest update wins
state[key] = value
2. Version-based (detects conflicts)
# Fail if value changed since agent read it
if state[f"{key}__version"] != expected_version:
raise ConflictError("State changed by another agent")
3. Merge-based (combines changes)
# Combine both agents' updates
current_value = state[key]
merged_value = merge_function(current_value, new_value)
state[key] = merged_value
Recommendation: Use version-based for critical data, last-write-wins for non-critical.
Use case: One agent broadcasts information to multiple interested agents.
Example: Health monitoring agent publishes customer status updates; multiple agents (sales, support, CS) subscribe to updates for customers they manage.
from collections import defaultdict
from typing import Callable, List
class PubSubBroker:
"""Publish-subscribe message broker."""
def __init__(self):
self.subscribers: Dict[str, List[Callable]] = defaultdict(list)
self.message_log = []
def subscribe(self, topic: str, subscriber: Callable):
"""Subscribe to a topic."""
self.subscribers[topic].append(subscriber)
logging.info(f"[PubSub] New subscription to '{topic}'")
def unsubscribe(self, topic: str, subscriber: Callable):
"""Unsubscribe from a topic."""
if subscriber in self.subscribers[topic]:
self.subscribers[topic].remove(subscriber)
def publish(self, topic: str, message: dict, publisher_id: str):
"""Publish message to all subscribers."""
message_with_metadata = {
**message,
"topic": topic,
"publisher": publisher_id,
"timestamp": datetime.utcnow().isoformat(),
"message_id": str(uuid.uuid4())
}
# Log message
self.message_log.append(message_with_metadata)
# Deliver to all subscribers
subscribers = self.subscribers.get(topic, [])
logging.info(f"[PubSub] Publishing to '{topic}' ({len(subscribers)} subscribers)")
for subscriber in subscribers:
try:
subscriber(message_with_metadata)
except Exception as e:
logging.error(f"[PubSub] Subscriber failed: {e}")
# Continue delivering to other subscribers
# Usage: Customer health updates
class HealthMonitorAgent:
def __init__(self, pubsub: PubSubBroker):
self.pubsub = pubsub
def publish_health_update(self, customer_id: str, health_score: int, risk_flags: list):
"""Publish customer health update."""
self.pubsub.publish(
topic=f"customer.{customer_id}.health_update",
message={
"customer_id": customer_id,
"health_score": health_score,
"risk_flags": risk_flags
},
publisher_id="health_monitor_agent"
)
# Also publish to general topic
if health_score < 50:
self.pubsub.publish(
topic="customer.at_risk",
message={"customer_id": customer_id, "health_score": health_score},
publisher_id="health_monitor_agent"
)
class SalesAgent:
def __init__(self, pubsub: PubSubBroker, managed_customers: list):
self.pubsub = pubsub
self.managed_customers = managed_customers
# Subscribe to updates for managed customers
for customer_id in managed_customers:
self.pubsub.subscribe(
topic=f"customer.{customer_id}.health_update",
subscriber=self.handle_health_update
)
def handle_health_update(self, message: dict):
"""React to customer health changes."""
customer_id = message["customer_id"]
health_score = message["health_score"]
if health_score < 60:
logging.info(f"[SalesAgent] Customer {customer_id} at risk (score: {health_score})")
self.schedule_check_in_call(customer_id)
class CSAgent:
def __init__(self, pubsub: PubSubBroker):
self.pubsub = pubsub
# Subscribe to all at-risk customers
self.pubsub.subscribe(
topic="customer.at_risk",
subscriber=self.handle_at_risk_customer
)
def handle_at_risk_customer(self, message: dict):
"""Proactive outreach to at-risk customers."""
customer_id = message["customer_id"]
self.draft_outreach_email(customer_id)
Use hierarchical topics for flexible subscriptions:
customer.{customer_id}.health_update # Specific customer
customer.*.health_update # All customers (wildcard)
customer.at_risk # Filtered group
system.errors # System-wide events
agent.{agent_id}.status # Agent status updates
Allow subscribers to filter messages:
def subscribe_with_filter(self, topic: str, subscriber: Callable, filter_func: Callable):
"""Subscribe with message filter."""
def filtered_subscriber(message):
if filter_func(message):
subscriber(message)
self.subscribers[topic].append(filtered_subscriber)
# Usage
cs_agent.subscribe_with_filter(
topic="customer.*.health_update",
subscriber=cs_agent.handle_update,
filter_func=lambda msg: msg["health_score"] < 70 # Only low scores
)
Use case: Agents need to reach agreement when they have conflicting goals or information.
Example: Pricing agent wants to maximize revenue, customer success agent wants to reduce churn risk. They negotiate on renewal price.
from enum import Enum
class NegotiationStatus(Enum):
PROPOSING = "proposing"
COUNTERPROPOSING = "counterproposing"
AGREED = "agreed"
FAILED = "failed"
class Negotiation:
"""Manage negotiation between agents."""
def __init__(self, negotiation_id: str, participants: list, max_rounds: int = 5):
self.id = negotiation_id
self.participants = participants
self.max_rounds = max_rounds
self.current_round = 0
self.proposals = []
self.status = NegotiationStatus.PROPOSING
def propose(self, agent_id: str, proposal: dict):
"""Agent makes a proposal."""
if agent_id not in self.participants:
raise ValueError(f"Agent {agent_id} not part of negotiation")
self.proposals.append({
"agent_id": agent_id,
"proposal": proposal,
"round": self.current_round,
"timestamp": datetime.utcnow().isoformat()
})
# Check if all agents have proposed this round
round_proposals = [p for p in self.proposals if p["round"] == self.current_round]
if len(round_proposals) == len(self.participants):
# All agents proposed, check for agreement
self._check_agreement()
def _check_agreement(self):
"""Check if agents reached agreement."""
round_proposals = [p for p in self.proposals if p["round"] == self.current_round]
# Simple agreement: all proposals must be identical
if self._proposals_match(round_proposals):
self.status = NegotiationStatus.AGREED
return
# No agreement, start next round
self.current_round += 1
if self.current_round >= self.max_rounds:
# Failed to reach agreement
self.status = NegotiationStatus.FAILED
else:
self.status = NegotiationStatus.COUNTERPROPOSING
def _proposals_match(self, proposals: list) -> bool:
"""Check if all proposals are identical."""
if not proposals:
return False
first = proposals[0]["proposal"]
return all(p["proposal"] == first for p in proposals)
# Usage: Renewal pricing negotiation
class PricingAgent:
def negotiate_renewal_price(self, customer_id: str, current_price: int):
"""Determine renewal price."""
# Start negotiation
negotiation = Negotiation(
negotiation_id=f"renewal_{customer_id}",
participants=["pricing_agent", "cs_agent"]
)
# Initial proposal: 10% price increase
proposed_price = int(current_price * 1.10)
negotiation.propose("pricing_agent", {"price": proposed_price, "reasoning": "Market rate adjustment"})
return negotiation
class CSAgent:
def negotiate_renewal_price(self, negotiation: Negotiation, customer_health: int):
"""Counter-propose based on customer health."""
latest_pricing_proposal = negotiation.proposals[-1]["proposal"]
proposed_price = latest_pricing_proposal["price"]
if customer_health < 50:
# At-risk customer, propose discount
counter_price = int(proposed_price * 0.90)
reasoning = "Customer at risk, discount to retain"
elif customer_health < 70:
# Keep price flat
counter_price = proposed_price // 1.10 # Remove increase
reasoning = "Customer neutral, maintain current price"
else:
# Healthy customer, accept increase
counter_price = proposed_price
reasoning = "Healthy customer, accept pricing proposal"
negotiation.propose("cs_agent", {"price": counter_price, "reasoning": reasoning})
return negotiation
1. Fixed rules (deterministic)
def calculate_counteroffer(self, their_offer: int, our_target: int, round_num: int):
"""Move towards target price gradually."""
gap = our_target - their_offer
concession = gap * (0.5 ** round_num) # Decrease concession each round
return their_offer + concession
2. LLM-based (flexible)
def generate_counteroffer(self, negotiation_history: list, constraints: dict):
"""Use LLM to generate strategic counteroffer."""
prompt = f"""
You are a pricing negotiation agent.
History: {negotiation_history}
Constraints: Min price = ${constraints['min_price']}, Max discount = {constraints['max_discount']}%
Generate a counteroffer that:
1. Moves towards agreement
2. Respects constraints
3. Considers customer value
Return JSON: {{"price": <amount>, "reasoning": "<justification>"}}
"""
response = llm.generate(prompt)
return parse_json(response)
3. Voting (multi-agent consensus)
def reach_consensus(self, agent_proposals: list):
"""All agents vote, majority wins."""
votes = {}
for proposal in agent_proposals:
price = proposal["price"]
votes[price] = votes.get(price, 0) + 1
# Majority price wins
consensus_price = max(votes, key=votes.get)
return consensus_price
Robust agent communication requires handling failures gracefully.
Prevent cascading failures when an agent is consistently unreachable:
class CircuitBreaker:
"""Prevent sending to failing agents."""
def __init__(self, failure_threshold: int = 5, timeout: int = 60):
self.failure_count = 0
self.failure_threshold = failure_threshold
self.timeout = timeout
self.opened_at = None
def call(self, func, *args, **kwargs):
"""Execute function with circuit breaker."""
if self.is_open():
raise Exception("Circuit breaker open")
try:
result = func(*args, **kwargs)
self.on_success()
return result
except Exception as e:
self.on_failure()
raise
def is_open(self) -> bool:
"""Check if circuit is open."""
if not self.opened_at:
return False
# Check if timeout expired
if time.time() - self.opened_at > self.timeout:
self.reset()
return False
return True
def on_failure(self):
"""Record failure."""
self.failure_count += 1
if self.failure_count >= self.failure_threshold:
self.opened_at = time.time()
def on_success(self):
"""Reset on success."""
self.failure_count = 0
def reset(self):
"""Reset circuit breaker."""
self.failure_count = 0
self.opened_at = None
Store failed messages for later inspection:
class DeadLetterQueue:
"""Store messages that failed to deliver."""
def __init__(self):
self.failed_messages = []
def add(self, message: Message, error: Exception):
"""Add failed message."""
self.failed_messages.append({
"message": message,
"error": str(error),
"failed_at": datetime.utcnow().isoformat()
})
def retry_all(self, broker: MessageBroker):
"""Retry all failed messages."""
for entry in self.failed_messages:
broker.send(
from_agent=entry["message"].from_agent,
to_agent=entry["message"].to_agent,
payload=entry["message"].payload
)
self.failed_messages = []
Direct messaging handles 89% of agent communication needs -start here, add complexity only when needed.
Shared state enables collaboration but requires careful concurrency control -use locks and versioning to prevent conflicts.
Pub/sub decouples agents -subscribers don't know about publishers, making systems flexible and scalable.
Negotiation resolves conflicts between agents with competing objectives -especially useful for pricing, resource allocation, and scheduling.
Explicit error handling is mandatory -retries, circuit breakers, and dead letter queues prevent cascading failures.
Agent-to-agent communication is where multi-agent systems succeed or fail. Simple message passing works for prototypes, but production systems need structured protocols with acknowledgments, retries, state management, and error handling. Choose the simplest pattern that solves your problem -don't over-engineer with pub/sub if direct messaging suffices.
Q: Should I build custom protocols or use existing message brokers (RabbitMQ, Kafka)? A: For <1000 messages/second, custom is simpler and sufficient. Above that, use existing brokers for reliability and scaling. RabbitMQ for complex routing, Kafka for high throughput.
Q: How do I debug communication failures? A: Log every message send/receive with correlation IDs. Use distributed tracing (Jaeger, Honeycomb) to visualize message flows across agents.
Q: Can agents communicate across different machines/servers? A: Yes, replace in-memory message passing with network protocols (HTTP, gRPC, WebSockets). Use message brokers (RabbitMQ, Redis Pub/Sub) for distributed pub/sub.
Q: What happens if an agent crashes mid-processing? A: Implement idempotency (handling duplicate messages) and checkpointing (saving progress). On restart, agent resumes from last checkpoint.
Further reading:
External references: