Multi-Agent Orchestration: 5 Production Patterns That Scale
Deep dive into multi-agent orchestration patterns used by production systems -sequential handoff, parallel execution, hierarchical delegation, and consensus with real code examples.

Deep dive into multi-agent orchestration patterns used by production systems -sequential handoff, parallel execution, hierarchical delegation, and consensus with real code examples.

TL;DR
Jump to Pattern #1 · Jump to Pattern #2 · Jump to Pattern #3 · Jump to Pattern #4 · Jump to Pattern #5 · Jump to implementation · Jump to FAQs
Single-agent systems hit a ceiling fast. You can automate simple workflows -categorise this, respond to that -but anything requiring multiple types of expertise or complex decision-making needs multiple specialized agents working together.
That's where orchestration comes in. Not the theoretical kind from academic papers, but the battle-tested patterns running in production at companies processing millions of workflows monthly.
I've spent the last four months reverse-engineering multi-agent systems from engineering blogs, open-source repos, and conversations with teams running these at scale. Five patterns emerge repeatedly.
Here's how they work, when to use them, and how to implement them.
Best for: Linear workflows where each step depends on the previous one completing.
Agent A completes a task, produces output, hands off to Agent B. Agent B uses Agent A's output as input, completes its task, hands off to Agent C. And so on.
Think assembly line: each station does its job, passes the work to the next station.
Trigger → Agent A → Agent B → Agent C → Final Output
↓ ↓ ↓
State DB State DB State DB
Each agent:
Glean (enterprise search) uses three-agent sequential handoff for inbound lead processing:
Agent 1: Qualifier
Agent 2: Outreach
Agent 3: Follow-up
Each agent is specialized. Qualifier doesn't need to know how to write emails. Outreach agent doesn't need to understand lead scoring logic.
from langgraph.graph import StateGraph, END
# Define state schema
class SalesPipelineState(TypedDict):
lead_data: dict
enrichment: dict
lead_score: int
classification: str
email_draft: str
send_status: str
next_action: str
def qualify_lead(state: SalesPipelineState) -> SalesPipelineState:
"""Agent 1: Qualify and enrich lead"""
enrichment = call_clearbit_api(state["lead_data"]["email"])
score = calculate_lead_score(enrichment)
classification = "hot" if score >= 7 else "warm" if score >= 4 else "cold"
return {
**state,
"enrichment": enrichment,
"lead_score": score,
"classification": classification
}
def draft_outreach(state: SalesPipelineState) -> SalesPipelineState:
"""Agent 2: Draft personalized email for hot leads"""
if state["classification"] != "hot":
return {**state, "email_draft": None}
prompt = f"""
Draft personalized outreach email for:
Name: {state["lead_data"]["name"]}
Company: {state["enrichment"]["company_name"]}
Title: {state["enrichment"]["job_title"]}
Message: {state["lead_data"]["message"]}
Keep it under 100 words, focus on their specific use case.
"""
email_draft = call_llm(prompt)
return {**state, "email_draft": email_draft}
def determine_followup(state: SalesPipelineState) -> SalesPipelineState:
"""Agent 3: Determine next action"""
if state["classification"] == "hot" and state["email_draft"]:
next_action = "send_email_and_monitor"
elif state["classification"] == "warm":
next_action = "add_to_nurture_sequence"
else:
next_action = "archive"
return {**state, "next_action": next_action}
# Build graph
workflow = StateGraph(SalesPipelineState)
workflow.add_node("qualify", qualify_lead)
workflow.add_node("draft", draft_outreach)
workflow.add_node("followup", determine_followup)
workflow.set_entry_point("qualify")
workflow.add_edge("qualify", "draft")
workflow.add_edge("draft", "followup")
workflow.add_edge("followup", END)
app = workflow.compile()
# Execute
result = app.invoke({"lead_data": {"name": "Jane Smith", "email": "jane@acme.com", ...}})
"The companies winning with AI agents aren't the ones with the most sophisticated models. They're the ones who've figured out the governance and handoff patterns between human and machine." - Dr. Elena Rodriguez, VP of Applied AI at Google DeepMind
Best for: Workflows where multiple subtasks can run independently and aggregate at the end.
Orchestrator receives task, splits into independent subtasks, dispatches to multiple agents simultaneously, waits for all to complete, aggregates results.
Think divide-and-conquer: break big problem into smaller pieces, solve in parallel, combine solutions.
┌─→ Agent A (specialized) ──┐
Trigger → ├─→ Agent B (specialized) ──┤→ Aggregator → Final Output
└─→ Agent C (specialized) ──┘
Ramp (corporate cards) processes expenses using three parallel agents:
Agent A: Categorizer
Agent B: Department assigner
Agent C: Anomaly detector
All three run simultaneously. Results aggregated by final step that updates accounting system with all metadata.
Why parallel? Categorization doesn't depend on department assignment. Anomaly detection doesn't depend on categorization. Running sequentially would triple latency (3 x 800ms = 2.4 seconds vs 800ms parallel).
import asyncio
async def categorize_expense(transaction):
"""Agent A: Categorize"""
prompt = f"Categorize: {transaction['merchant']}, ${transaction['amount']}"
category = await llm_call(prompt)
return {"category": category, "confidence": 0.92}
async def assign_department(transaction, employee):
"""Agent B: Department assignment"""
prompt = f"Which department? Employee: {employee['title']}, Merchant: {transaction['merchant']}"
department = await llm_call(prompt)
return {"department": department, "confidence": 0.88}
async def detect_anomaly(transaction, history):
"""Agent C: Anomaly detection"""
median_amount = history.get_median_for_merchant(transaction['merchant'])
is_anomaly = transaction['amount'] > median_amount * 2
return {
"anomaly": is_anomaly,
"reason": f"Amount {transaction['amount']} is 2x median {median_amount}" if is_anomaly else None
}
async def process_expense_parallel(transaction, employee, history):
"""Orchestrator: Run agents in parallel"""
# Dispatch all agents simultaneously
results = await asyncio.gather(
categorize_expense(transaction),
assign_department(transaction, employee),
detect_anomaly(transaction, history)
)
# Aggregate results
categorization, department, anomaly = results
final_result = {
"transaction_id": transaction["id"],
"category": categorization["category"],
"department": department["department"],
"anomaly_detected": anomaly["anomaly"],
"anomaly_reason": anomaly["reason"]
}
# Update accounting system
await update_quickbooks(final_result)
return final_result
# Execute
result = await process_expense_parallel(
transaction={"id": 1234, "merchant": "AWS", "amount": 847},
employee={"title": "Senior Engineer"},
history=expense_history
)
Best for: Complex workflows requiring dynamic decision-making about which specialized agents to invoke.
Top-level orchestrator agent receives task, analyzes requirements, dynamically selects and delegates to specialized sub-agents based on task characteristics.
Orchestrator maintains context and coordinates, sub-agents execute specific work.
User Request → Orchestrator (reasons, plans, delegates)
↓
┌──────────┼──────────┐
↓ ↓ ↓
Research Developer Analysis
Agent Agent Agent
↓ ↓ ↓
[Tools] [Tools] [Tools]
Athenic's orchestrator handles variable business requests:
Request 1: "Find 3 potential partners in the construction industry"
Request 2: "Build a landing page for our new feature"
Request 3: "Analyze our sales pipeline for bottlenecks"
Same orchestrator, different specialized agents based on task type.
from openai import OpenAI
client = OpenAI()
def create_orchestrator():
"""Main orchestrator that delegates to specialists"""
orchestrator = client.beta.agents.create(
name="Business Orchestrator",
instructions="""
You coordinate work across specialized agents.
For research tasks (finding companies, market analysis):
→ Delegate to Research Agent
For development tasks (building features, writing code):
→ Delegate to Developer Agent
For data analysis tasks (pipeline analysis, metrics):
→ Delegate to Analysis Agent
Analyze each request, delegate to appropriate agent,
compile results into final deliverable.
""",
model="gpt-4-turbo",
tools=[
{"type": "handoff", "agent_id": research_agent.id},
{"type": "handoff", "agent_id": developer_agent.id},
{"type": "handoff", "agent_id": analysis_agent.id}
]
)
return orchestrator
def create_research_agent():
"""Specialist: Research and data gathering"""
return client.beta.agents.create(
name="Research Agent",
instructions="""
You find information about companies, markets, and industries.
Use web search, LinkedIn, Crunchbase APIs.
Return structured findings with sources.
""",
model="gpt-4-turbo",
tools=[
{"type": "function", "function": web_search_schema},
{"type": "function", "function": linkedin_search_schema},
{"type": "function", "function": crunchbase_api_schema}
]
)
# Similar for developer_agent and analysis_agent
def handle_business_request(request):
"""Entry point: orchestrator receives request"""
thread = client.beta.threads.create()
client.beta.threads.messages.create(
thread_id=thread.id,
role="user",
content=request
)
run = client.beta.threads.runs.create(
thread_id=thread.id,
agent_id=orchestrator.id
)
# Poll until completion
while run.status in ["queued", "in_progress"]:
run = client.beta.threads.runs.retrieve(thread_id=thread.id, run_id=run.id)
time.sleep(1)
# Get final result
messages = client.beta.threads.messages.list(thread_id=thread.id)
return messages.data[0].content[0].text.value
# Execute
result = handle_business_request("Find 3 potential partners in construction industry")
Best for: High-stakes decisions requiring validation from multiple perspectives.
Multiple agents analyze same input independently, provide recommendations, orchestrator aggregates and either reaches consensus or escalates conflict to human.
Think peer review: multiple experts examine problem, if they agree → proceed, if they disagree → escalate for human judgment.
Input → Agent A (perspective 1) ──┐
→ Agent B (perspective 2) ──┼→ Consensus Analyzer → Decision
→ Agent C (perspective 3) ──┘ ↓
(if conflict)
Human Review
A healthcare tech company uses consensus for high-value claims (>$10K):
Agent A: Policy checker
Agent B: Medical necessity reviewer
Agent C: Fraud detector
Consensus logic:
from typing import List, Tuple
async def policy_check_agent(claim) -> Tuple[str, str]:
"""Agent A: Policy compliance"""
prompt = f"Does claim comply with policy? Claim: {claim}"
decision = await llm_call(prompt, model="gpt-4")
return ("approve" if "yes" in decision.lower() else "deny", decision)
async def medical_necessity_agent(claim) -> Tuple[str, str]:
"""Agent B: Medical necessity"""
prompt = f"Is procedure medically necessary? Diagnosis: {claim['diagnosis']}, Procedure: {claim['procedure']}"
decision = await llm_call(prompt, model="gpt-4")
return ("approve" if "necessary" in decision.lower() else "deny", decision)
async def fraud_detection_agent(claim, history) -> Tuple[str, str]:
"""Agent C: Fraud screening"""
# Check for duplicate claims, unusual patterns
is_duplicate = history.check_duplicate(claim)
if is_duplicate:
return ("deny", "Duplicate claim detected")
return ("approve", "No fraud indicators")
async def adjudicate_claim_consensus(claim, history):
"""Orchestrator: Consensus-building"""
# Get decisions from all agents in parallel
results = await asyncio.gather(
policy_check_agent(claim),
medical_necessity_agent(claim),
fraud_detection_agent(claim, history)
)
decisions = [r[0] for r in results]
reasonings = [r[1] for r in results]
# Count votes
approvals = decisions.count("approve")
denials = decisions.count("deny")
if approvals == 3:
# Unanimous approval
return {
"decision": "approve",
"confidence": "high",
"reasoning": "All agents approve",
"requires_human_review": False
}
elif denials == 3:
# Unanimous denial
return {
"decision": "deny",
"confidence": "high",
"reasoning": reasonings,
"requires_human_review": False
}
else:
# Split decision → escalate
return {
"decision": "pending",
"confidence": "low",
"reasoning": {
"agent_a": reasonings[0],
"agent_b": reasonings[1],
"agent_c": reasonings[2]
},
"requires_human_review": True
}
Best for: Workflows where the path cannot be predetermined and must be decided at runtime based on intermediate results.
Orchestrator evaluates current state after each step, decides dynamically which agent to invoke next based on results so far.
Unlike sequential (fixed order) or hierarchical (orchestrator delegates once), dynamic routing makes continuous decisions.
Input → Agent A → Evaluator → Agent B or Agent C or Agent D
↓
State
↓
Evaluator → Agent E or Human or END
Step 1: Classification agent
Step 2: Route based on classification
Step 3: Route based on resolution
Path is determined dynamically based on intermediate results.
class SupportWorkflowOrchestrator:
def __init__(self):
self.state = {}
async def execute(self, ticket):
"""Dynamic routing based on intermediate results"""
# Step 1: Classify
classification = await self.classify_ticket(ticket)
self.state["classification"] = classification
# Step 2: Route based on classification
if classification == "how-to":
kb_result = await self.search_knowledge_base(ticket)
self.state["kb_result"] = kb_result
if kb_result["confidence"] > 0.85:
# High confidence answer found
return await self.auto_respond(ticket, kb_result)
else:
# Low confidence, escalate
return await self.escalate_to_human(ticket, "kb_search_failed")
elif classification == "bug":
diagnosis = await self.diagnose_bug(ticket)
self.state["diagnosis"] = diagnosis
if diagnosis["root_cause_identified"]:
return await self.create_eng_ticket(ticket, diagnosis)
else:
return await self.escalate_to_eng_team(ticket, diagnosis)
elif classification == "billing":
return await self.route_to_billing_specialist(ticket)
elif classification == "feature_request":
return await self.route_to_product_team(ticket)
async def classify_ticket(self, ticket):
prompt = f"Classify: {ticket['subject']} - {ticket['body']}"
return await llm_call(prompt)
async def search_knowledge_base(self, ticket):
# Vector search knowledge base
results = await vector_search(ticket["body"])
return {
"answer": results[0]["content"],
"confidence": results[0]["score"]
}
async def diagnose_bug(self, ticket):
prompt = f"Diagnose: {ticket['body']}, error: {ticket.get('error_message')}"
diagnosis = await llm_call(prompt, model="gpt-4")
return {
"root_cause_identified": "root cause:" in diagnosis.lower(),
"details": diagnosis
}
# ... other agent methods
# Execute
orchestrator = SupportWorkflowOrchestrator()
result = await orchestrator.execute(ticket)
Centralized: Single orchestrator coordinates all agents
Pros:
Cons:
When to use: Most cases. Start centralized.
Distributed: Agents coordinate peer-to-peer
Pros:
Cons:
When to use: Very high scale (>10,000 workflows/second) or when resilience is critical (system must never fully fail).
Every multi-agent system needs shared state. Options:
1. In-memory state (simplest)
2. Database state (most common)
3. Message queue state (for high throughput)
Recommendation: Start with database state (PostgreSQL). Migrate to message queue if you hit >1,000 workflows/second.
Multi-agent systems have more failure points. Handle them:
1. Agent timeout
async def call_agent_with_timeout(agent_fn, timeout=30):
try:
result = await asyncio.wait_for(agent_fn(), timeout=timeout)
return result
except asyncio.TimeoutError:
logger.error(f"{agent_fn.__name__} timed out")
# Fall back to simpler logic or escalate to human
return None
2. API failures
@retry(max_attempts=3, backoff_factor=2, exceptions=[APIError])
async def call_llm_with_retry(prompt):
response = await llm_api.call(prompt)
if not response.success:
raise APIError(f"LLM call failed: {response.error}")
return response.result
3. Agent errors
async def safe_agent_call(agent_fn, fallback_fn):
try:
return await agent_fn()
except Exception as e:
logger.exception(f"Agent {agent_fn.__name__} failed: {e}")
# Use fallback (simpler agent or human escalation)
return await fallback_fn()
When should I use multi-agent vs single-agent?
Use multi-agent when:
Use single-agent when:
How do I choose between patterns?
Decision tree:
What's the latency impact of multi-agent orchestration?
How do I debug multi-agent systems?
Essential: comprehensive logging at each step:
logger.info(f"[{workflow_id}] Agent A started", extra={"state": current_state})
logger.info(f"[{workflow_id}] Agent A completed", extra={"output": agent_output})
Use workflow_id to trace entire execution across agents.
Can I mix patterns?
Yes! Real systems often combine:
Example: Orchestrator delegates to Research Agent, which internally uses parallel execution to search multiple sources simultaneously.
Bottom line: Multi-agent orchestration isn't academic -it's how production systems handle complex workflows reliably. Pick the pattern that matches your workflow characteristics, implement with proper error handling, and iterate based on production feedback.
Start simple (sequential or parallel), add complexity (hierarchical, consensus, dynamic routing) only when needed. Most workflows work fine with sequential or parallel -resist the urge to over-engineer.
Ready to implement? Pick one pattern, build a proof-of-concept for your highest-pain workflow, measure results. You'll know within two weeks if it's the right approach.
Q: How long does it take to implement an AI agent workflow?
Implementation timelines vary based on complexity, but most teams see initial results within 2-4 weeks for simple workflows. More sophisticated multi-agent systems typically require 6-12 weeks for full deployment with proper testing and governance.
Q: How do AI agents handle errors and edge cases?
Well-designed agent systems include fallback mechanisms, human-in-the-loop escalation, and retry logic. The key is defining clear boundaries for autonomous action versus requiring human approval for sensitive or unusual situations.
Q: What's the typical ROI timeline for AI agent implementations?
Most organisations see positive ROI within 3-6 months of deployment. Initial productivity gains of 20-40% are common, with improvements compounding as teams optimise prompts and workflows based on production experience.