Building Agentic Workflows: From Simple Chains to Complex Automations
Architect agentic workflows -sequential chains, parallel execution, conditional branching, loops, and error recovery patterns for production AI automation.

Architect agentic workflows -sequential chains, parallel execution, conditional branching, loops, and error recovery patterns for production AI automation.

TL;DR
Traditional automation (RPA, scripts):
IF condition THEN action ELSE other_action
Fixed logic. Breaks when inputs change.
Agentic workflow:
1. Analyze situation
2. Decide best approach
3. Execute actions
4. Evaluate results
5. Adjust if needed
Adaptive. Handles variability.
Key difference: Agent makes decisions at each step based on context, not predetermined rules.
When: Steps must run in order, each depends on previous output.
Example: Blog post generation
1. Research topic → 2. Generate outline → 3. Write sections → 4. Add citations → 5. Proofread
Implementation:
async def sequential_workflow(topic):
# Step 1: Research
research_agent = Agent("Researcher")
research = await research_agent.run(f"Research {topic}")
# Step 2: Outline (uses research output)
outline_agent = Agent("Outliner")
outline = await outline_agent.run(f"Create outline based on: {research}")
# Step 3: Write (uses outline)
writer_agent = Agent("Writer")
draft = await writer_agent.run(f"Write blog post following: {outline}")
# Step 4: Citations (uses draft)
citation_agent = Agent("Citation Expert")
cited = await citation_agent.run(f"Add citations to: {draft}")
# Step 5: Proofread
editor_agent = Agent("Editor")
final = await editor_agent.run(f"Proofread and polish: {cited}")
return final
Pros: Simple, predictable, easy to debug.
Cons: Slow (each step waits for previous), single point of failure.
Use when: Steps inherently sequential, outputs directly feed into next step.
When: Multiple independent tasks can run simultaneously.
Example: Market research
Simultaneously:
- Agent A: Analyze competitor pricing
- Agent B: Scan industry news
- Agent C: Survey customer reviews
- Agent D: Check social media sentiment
Combine results → Generate report
Implementation:
import asyncio
async def parallel_workflow(company):
# Launch all tasks at once
tasks = [
analyze_pricing(company),
scan_news(company),
analyze_reviews(company),
check_sentiment(company)
]
# Wait for all to complete
results = await asyncio.gather(*tasks)
# Combine results
pricing, news, reviews, sentiment = results
# Generate final report
report_agent = Agent("Report Generator")
report = await report_agent.run(f"""
Generate market research report:
Pricing: {pricing}
News: {news}
Reviews: {reviews}
Sentiment: {sentiment}
""")
return report
async def analyze_pricing(company):
agent = Agent("Pricing Analyst")
return await agent.run(f"Analyze {company} pricing vs competitors")
async def scan_news(company):
agent = Agent("News Scanner")
return await agent.run(f"Find latest news about {company}")
# ... other agents
Performance:
Pros: Fast, efficient resource use.
Cons: Requires independent tasks, harder to debug, need error handling for partial failures.
Use when: Tasks don't depend on each other, speed is important.
When: Next step depends on intermediate results.
Example: Customer support triage
1. Classify ticket (bug, feature request, question)
2. Route based on classification:
- Bug → Engineering team
- Feature request → Product team
- Question → Auto-reply with FAQ
Implementation:
async def conditional_workflow(ticket):
# Step 1: Classify
classifier = Agent("Classifier")
classification = await classifier.run(f"Classify this ticket: {ticket}")
# Step 2: Route based on classification
if "bug" in classification.lower():
# Bug path
engineer = Agent("Engineer")
response = await engineer.run(f"Diagnose bug: {ticket}")
priority = "high" if "critical" in response else "medium"
return await create_jira_ticket(ticket, response, priority)
elif "feature" in classification.lower():
# Feature request path
product_manager = Agent("Product Manager")
analysis = await product_manager.run(f"Evaluate feature request: {ticket}")
return await add_to_roadmap(analysis)
else:
# Question path
kb_search = Agent("Knowledge Base Search")
answer = await kb_search.run(f"Find answer for: {ticket}")
return await send_auto_reply(ticket, answer)
Decision Tree Example:
async def sales_lead_workflow(lead):
# Qualify lead
qualifier = Agent("Lead Qualifier")
score = await qualifier.run(f"Score lead 1-10: {lead}")
if score >= 8:
# Hot lead → Immediate outreach
sales_rep = Agent("Sales Rep")
await sales_rep.run(f"Call {lead['name']} immediately")
await sales_rep.run(f"Send personalized demo video")
return "hot_lead_workflow_complete"
elif score >= 5:
# Warm lead → Nurture campaign
marketer = Agent("Marketing Automation")
await marketer.run(f"Enroll {lead['email']} in 5-day email course")
return "warm_lead_workflow_complete"
else:
# Cold lead → Add to general newsletter
await add_to_newsletter(lead['email'])
return "cold_lead_workflow_complete"
Pros: Flexible, handles different scenarios, optimizes resources.
Cons: More complex, harder to predict execution path.
Use when: Different inputs require different handling, want to optimize for specific cases.
When: Task requires repeated attempts or incremental refinement.
Example: Web research with verification
WHILE not_enough_sources AND attempts < max_attempts:
1. Search for sources
2. Verify credibility
3. If insufficient, refine search query
4. Repeat
Implementation:
async def loop_workflow(research_topic, min_sources=5, max_attempts=10):
sources = []
attempt = 0
while len(sources) < min_sources and attempt < max_attempts:
attempt += 1
# Search for sources
researcher = Agent("Researcher")
new_sources = await researcher.run(f"""
Find credible sources on: {research_topic}
Already found: {sources}
Looking for {min_sources - len(sources)} more
""")
# Verify each source
verifier = Agent("Fact Checker")
for source in new_sources:
credibility = await verifier.run(f"Verify credibility of: {source}")
if "credible" in credibility.lower():
sources.append(source)
# If still not enough, refine search
if len(sources) < min_sources:
refiner = Agent("Query Refiner")
research_topic = await refiner.run(f"""
Original topic: {research_topic}
Found so far: {len(sources)} sources
Suggest refined search query
""")
return sources
With Early Exit:
async def content_generation_loop(topic, quality_threshold=0.8):
max_iterations = 5
for iteration in range(max_iterations):
# Generate content
writer = Agent("Writer")
content = await writer.run(f"Write about {topic}")
# Evaluate quality
evaluator = Agent("Quality Checker")
score = await evaluator.run(f"Score quality 0-1: {content}")
if float(score) >= quality_threshold:
return content # Good enough, exit early
# Not good enough, refine prompt for next iteration
topic = f"{topic} (improve on: {content[:200]}...)"
return content # Return best attempt after max iterations
Pros: Handles uncertain outcomes, keeps trying until success.
Cons: Unpredictable execution time, risk of infinite loops (must have max iterations).
Use when: Task success not guaranteed on first try, iterative refinement improves results.
When: Need human approval before proceeding, or human input to resolve ambiguity.
Example: Content approval workflow
1. Agent drafts email
2. Send to human for review
3. Wait for approval/edits
4. If approved → Send
If edits → Incorporate → Go to step 2
Implementation:
async def human_in_loop_workflow(task_description):
max_iterations = 3
for iteration in range(max_iterations):
# Agent generates output
agent = Agent("Content Creator")
output = await agent.run(task_description)
# Request human review
review = await request_human_review(output)
if review["status"] == "approved":
return await execute_final_action(output)
elif review["status"] == "rejected":
return {"status": "cancelled", "reason": review["feedback"]}
elif review["status"] == "needs_edits":
# Incorporate feedback and retry
task_description = f"""
{task_description}
Previous attempt: {output}
Feedback: {review['feedback']}
Incorporate this feedback in next version
"""
return {"status": "max_iterations_reached"}
async def request_human_review(output):
# Store in database, notify user
approval_id = db.create_approval_request(output)
await notify_user(approval_id)
# Wait for response (poll or webhook)
response = await wait_for_approval(approval_id, timeout_minutes=60)
return response
Pros: Safety, quality control, handles edge cases humans are better at.
Cons: Slow (waits for human), requires notification infrastructure.
Use when: High-stakes decisions, compliance requirements, quality critical.
"The companies winning with AI agents aren't the ones with the most sophisticated models. They're the ones who've figured out the governance and handoff patterns between human and machine." - Dr. Elena Rodriguez, VP of Applied AI at Google DeepMind
Use case: Automated customer onboarding
Workflow:
1. Receive signup (trigger)
2. PARALLEL:
- Verify email
- Check if company already exists in CRM
- Validate payment method
3. IF company exists:
Add user to existing account
ELSE:
Create new company account
4. SEQUENTIAL:
- Send welcome email
- Create onboarding tasks
- Schedule kickoff call
5. LOOP (check until complete):
- Has user completed profile? (check every 6 hours)
- If not, send reminder
- If yes after 3 days, trigger success workflow
Code:
async def customer_onboarding_workflow(signup_data):
# 1. Trigger
user_email = signup_data["email"]
company_name = signup_data["company"]
# 2. PARALLEL validation
validation_tasks = [
verify_email(user_email),
check_existing_company(company_name),
validate_payment(signup_data["payment_method"])
]
email_valid, existing_company, payment_valid = await asyncio.gather(*validation_tasks)
if not (email_valid and payment_valid):
return {"status": "validation_failed"}
# 3. CONDITIONAL: Create or join account
if existing_company:
account_id = await add_user_to_account(existing_company["id"], user_email)
else:
account_id = await create_new_account(company_name, user_email)
# 4. SEQUENTIAL onboarding steps
await send_welcome_email(user_email, company_name)
await create_onboarding_tasks(account_id)
await schedule_kickoff_call(account_id)
# 5. LOOP: Check completion
for day in range(7):
await asyncio.sleep(6 * 3600) # Wait 6 hours
profile_complete = await check_profile_completion(account_id)
if profile_complete:
await trigger_success_workflow(account_id)
return {"status": "onboarding_complete"}
if day < 6: # Don't send reminder on last check
await send_reminder_email(user_email)
# After 7 days, escalate to human
await escalate_to_customer_success(account_id)
return {"status": "needs_human_intervention"}
Results:
async def workflow_with_error_recovery(task):
for attempt in range(3): # Retry up to 3 times
try:
result = await execute_step(task)
return result
except TemporaryError as e:
# Transient failure (API timeout, rate limit)
await asyncio.sleep(2 ** attempt) # Exponential backoff
continue
except PermanentError as e:
# Fatal error (invalid input, auth failure)
await log_error(e)
await notify_admin(e)
raise
# All retries failed
await escalate_to_human(task)
raise MaxRetriesExceeded()
class WorkflowState:
def __init__(self, workflow_id):
self.workflow_id = workflow_id
self.state = self.load_from_db()
def save_checkpoint(self, step_name, data):
"""Save workflow state after each step"""
self.state[step_name] = {
"data": data,
"completed_at": datetime.now(),
"status": "completed"
}
db.update("workflow_states", self.workflow_id, self.state)
async def resume_from_last_checkpoint(self):
"""Resume workflow from last saved state"""
completed_steps = [
step for step, status in self.state.items()
if status.get("status") == "completed"
]
# Skip completed steps, start from next one
return await self.execute_remaining_steps(completed_steps)
async def step_with_timeout(agent_task, timeout_seconds=60):
try:
result = await asyncio.wait_for(
agent_task,
timeout=timeout_seconds
)
return result
except asyncio.TimeoutError:
# Timeout exceeded
await log_timeout(agent_task)
# Decide: retry, skip, or fail workflow
return await handle_timeout_policy(agent_task)
| Tool | Best For | Learning Curve | Cost |
|---|---|---|---|
| LangChain | Quick prototypes, research | Low | Free (OSS) |
| LangGraph | Complex state machines | Medium | Free + Cloud ($) |
| Temporal | Mission-critical workflows | High | Free + Cloud ($$) |
| Prefect | Data pipelines | Medium | Free + Cloud ($) |
| Custom (FastAPI + Celery) | Full control | High | Infrastructure only |
Recommendation: Start with LangChain for prototypes, migrate to LangGraph or Temporal for production.
How do I decide between sequential and parallel?
Ask: "Does Step B need Step A's output?"
What's a good max_iterations for loops?
Depends on task:
Always set a limit to prevent infinite loops.
Should every workflow have human-in-the-loop?
No. Only for:
Routine tasks should be fully automated.
How do I test workflows?
Bottom line: Agentic workflows combine sequential, parallel, conditional, loop, and human-in-the-loop patterns. Start simple (sequential chains), add complexity as needed. Production workflows require error recovery, state persistence, and observability. Real-world customer onboarding workflow: 4 hours → 8 minutes.
Next: Read our Error Handling guide for production-grade failure management.