Building Agentic Workflows: From Simple Chains to Complex Automations
Architect agentic workflows -sequential chains, parallel execution, conditional branching, loops, and error recovery patterns for production AI automation.
Architect agentic workflows -sequential chains, parallel execution, conditional branching, loops, and error recovery patterns for production AI automation.
TL;DR
Traditional automation (RPA, scripts):
IF condition THEN action ELSE other_action
Fixed logic. Breaks when inputs change.
Agentic workflow:
1. Analyze situation
2. Decide best approach
3. Execute actions
4. Evaluate results
5. Adjust if needed
Adaptive. Handles variability.
Key difference: Agent makes decisions at each step based on context, not predetermined rules.
When: Steps must run in order, each depends on previous output.
Example: Blog post generation
1. Research topic → 2. Generate outline → 3. Write sections → 4. Add citations → 5. Proofread
Implementation:
async def sequential_workflow(topic):
# Step 1: Research
research_agent = Agent("Researcher")
research = await research_agent.run(f"Research {topic}")
# Step 2: Outline (uses research output)
outline_agent = Agent("Outliner")
outline = await outline_agent.run(f"Create outline based on: {research}")
# Step 3: Write (uses outline)
writer_agent = Agent("Writer")
draft = await writer_agent.run(f"Write blog post following: {outline}")
# Step 4: Citations (uses draft)
citation_agent = Agent("Citation Expert")
cited = await citation_agent.run(f"Add citations to: {draft}")
# Step 5: Proofread
editor_agent = Agent("Editor")
final = await editor_agent.run(f"Proofread and polish: {cited}")
return final
Pros: Simple, predictable, easy to debug.
Cons: Slow (each step waits for previous), single point of failure.
Use when: Steps inherently sequential, outputs directly feed into next step.
When: Multiple independent tasks can run simultaneously.
Example: Market research
Simultaneously:
- Agent A: Analyze competitor pricing
- Agent B: Scan industry news
- Agent C: Survey customer reviews
- Agent D: Check social media sentiment
Combine results → Generate report
Implementation:
import asyncio
async def parallel_workflow(company):
# Launch all tasks at once
tasks = [
analyze_pricing(company),
scan_news(company),
analyze_reviews(company),
check_sentiment(company)
]
# Wait for all to complete
results = await asyncio.gather(*tasks)
# Combine results
pricing, news, reviews, sentiment = results
# Generate final report
report_agent = Agent("Report Generator")
report = await report_agent.run(f"""
Generate market research report:
Pricing: {pricing}
News: {news}
Reviews: {reviews}
Sentiment: {sentiment}
""")
return report
async def analyze_pricing(company):
agent = Agent("Pricing Analyst")
return await agent.run(f"Analyze {company} pricing vs competitors")
async def scan_news(company):
agent = Agent("News Scanner")
return await agent.run(f"Find latest news about {company}")
# ... other agents
Performance:
Pros: Fast, efficient resource use.
Cons: Requires independent tasks, harder to debug, need error handling for partial failures.
Use when: Tasks don't depend on each other, speed is important.
When: Next step depends on intermediate results.
Example: Customer support triage
1. Classify ticket (bug, feature request, question)
2. Route based on classification:
- Bug → Engineering team
- Feature request → Product team
- Question → Auto-reply with FAQ
Implementation:
async def conditional_workflow(ticket):
# Step 1: Classify
classifier = Agent("Classifier")
classification = await classifier.run(f"Classify this ticket: {ticket}")
# Step 2: Route based on classification
if "bug" in classification.lower():
# Bug path
engineer = Agent("Engineer")
response = await engineer.run(f"Diagnose bug: {ticket}")
priority = "high" if "critical" in response else "medium"
return await create_jira_ticket(ticket, response, priority)
elif "feature" in classification.lower():
# Feature request path
product_manager = Agent("Product Manager")
analysis = await product_manager.run(f"Evaluate feature request: {ticket}")
return await add_to_roadmap(analysis)
else:
# Question path
kb_search = Agent("Knowledge Base Search")
answer = await kb_search.run(f"Find answer for: {ticket}")
return await send_auto_reply(ticket, answer)
Decision Tree Example:
async def sales_lead_workflow(lead):
# Qualify lead
qualifier = Agent("Lead Qualifier")
score = await qualifier.run(f"Score lead 1-10: {lead}")
if score >= 8:
# Hot lead → Immediate outreach
sales_rep = Agent("Sales Rep")
await sales_rep.run(f"Call {lead['name']} immediately")
await sales_rep.run(f"Send personalized demo video")
return "hot_lead_workflow_complete"
elif score >= 5:
# Warm lead → Nurture campaign
marketer = Agent("Marketing Automation")
await marketer.run(f"Enroll {lead['email']} in 5-day email course")
return "warm_lead_workflow_complete"
else:
# Cold lead → Add to general newsletter
await add_to_newsletter(lead['email'])
return "cold_lead_workflow_complete"
Pros: Flexible, handles different scenarios, optimizes resources.
Cons: More complex, harder to predict execution path.
Use when: Different inputs require different handling, want to optimize for specific cases.
When: Task requires repeated attempts or incremental refinement.
Example: Web research with verification
WHILE not_enough_sources AND attempts < max_attempts:
1. Search for sources
2. Verify credibility
3. If insufficient, refine search query
4. Repeat
Implementation:
async def loop_workflow(research_topic, min_sources=5, max_attempts=10):
sources = []
attempt = 0
while len(sources) < min_sources and attempt < max_attempts:
attempt += 1
# Search for sources
researcher = Agent("Researcher")
new_sources = await researcher.run(f"""
Find credible sources on: {research_topic}
Already found: {sources}
Looking for {min_sources - len(sources)} more
""")
# Verify each source
verifier = Agent("Fact Checker")
for source in new_sources:
credibility = await verifier.run(f"Verify credibility of: {source}")
if "credible" in credibility.lower():
sources.append(source)
# If still not enough, refine search
if len(sources) < min_sources:
refiner = Agent("Query Refiner")
research_topic = await refiner.run(f"""
Original topic: {research_topic}
Found so far: {len(sources)} sources
Suggest refined search query
""")
return sources
With Early Exit:
async def content_generation_loop(topic, quality_threshold=0.8):
max_iterations = 5
for iteration in range(max_iterations):
# Generate content
writer = Agent("Writer")
content = await writer.run(f"Write about {topic}")
# Evaluate quality
evaluator = Agent("Quality Checker")
score = await evaluator.run(f"Score quality 0-1: {content}")
if float(score) >= quality_threshold:
return content # Good enough, exit early
# Not good enough, refine prompt for next iteration
topic = f"{topic} (improve on: {content[:200]}...)"
return content # Return best attempt after max iterations
Pros: Handles uncertain outcomes, keeps trying until success.
Cons: Unpredictable execution time, risk of infinite loops (must have max iterations).
Use when: Task success not guaranteed on first try, iterative refinement improves results.
When: Need human approval before proceeding, or human input to resolve ambiguity.
Example: Content approval workflow
1. Agent drafts email
2. Send to human for review
3. Wait for approval/edits
4. If approved → Send
If edits → Incorporate → Go to step 2
Implementation:
async def human_in_loop_workflow(task_description):
max_iterations = 3
for iteration in range(max_iterations):
# Agent generates output
agent = Agent("Content Creator")
output = await agent.run(task_description)
# Request human review
review = await request_human_review(output)
if review["status"] == "approved":
return await execute_final_action(output)
elif review["status"] == "rejected":
return {"status": "cancelled", "reason": review["feedback"]}
elif review["status"] == "needs_edits":
# Incorporate feedback and retry
task_description = f"""
{task_description}
Previous attempt: {output}
Feedback: {review['feedback']}
Incorporate this feedback in next version
"""
return {"status": "max_iterations_reached"}
async def request_human_review(output):
# Store in database, notify user
approval_id = db.create_approval_request(output)
await notify_user(approval_id)
# Wait for response (poll or webhook)
response = await wait_for_approval(approval_id, timeout_minutes=60)
return response
Pros: Safety, quality control, handles edge cases humans are better at.
Cons: Slow (waits for human), requires notification infrastructure.
Use when: High-stakes decisions, compliance requirements, quality critical.
Use case: Automated customer onboarding
Workflow:
1. Receive signup (trigger)
2. PARALLEL:
- Verify email
- Check if company already exists in CRM
- Validate payment method
3. IF company exists:
Add user to existing account
ELSE:
Create new company account
4. SEQUENTIAL:
- Send welcome email
- Create onboarding tasks
- Schedule kickoff call
5. LOOP (check until complete):
- Has user completed profile? (check every 6 hours)
- If not, send reminder
- If yes after 3 days, trigger success workflow
Code:
async def customer_onboarding_workflow(signup_data):
# 1. Trigger
user_email = signup_data["email"]
company_name = signup_data["company"]
# 2. PARALLEL validation
validation_tasks = [
verify_email(user_email),
check_existing_company(company_name),
validate_payment(signup_data["payment_method"])
]
email_valid, existing_company, payment_valid = await asyncio.gather(*validation_tasks)
if not (email_valid and payment_valid):
return {"status": "validation_failed"}
# 3. CONDITIONAL: Create or join account
if existing_company:
account_id = await add_user_to_account(existing_company["id"], user_email)
else:
account_id = await create_new_account(company_name, user_email)
# 4. SEQUENTIAL onboarding steps
await send_welcome_email(user_email, company_name)
await create_onboarding_tasks(account_id)
await schedule_kickoff_call(account_id)
# 5. LOOP: Check completion
for day in range(7):
await asyncio.sleep(6 * 3600) # Wait 6 hours
profile_complete = await check_profile_completion(account_id)
if profile_complete:
await trigger_success_workflow(account_id)
return {"status": "onboarding_complete"}
if day < 6: # Don't send reminder on last check
await send_reminder_email(user_email)
# After 7 days, escalate to human
await escalate_to_customer_success(account_id)
return {"status": "needs_human_intervention"}
Results:
async def workflow_with_error_recovery(task):
for attempt in range(3): # Retry up to 3 times
try:
result = await execute_step(task)
return result
except TemporaryError as e:
# Transient failure (API timeout, rate limit)
await asyncio.sleep(2 ** attempt) # Exponential backoff
continue
except PermanentError as e:
# Fatal error (invalid input, auth failure)
await log_error(e)
await notify_admin(e)
raise
# All retries failed
await escalate_to_human(task)
raise MaxRetriesExceeded()
class WorkflowState:
def __init__(self, workflow_id):
self.workflow_id = workflow_id
self.state = self.load_from_db()
def save_checkpoint(self, step_name, data):
"""Save workflow state after each step"""
self.state[step_name] = {
"data": data,
"completed_at": datetime.now(),
"status": "completed"
}
db.update("workflow_states", self.workflow_id, self.state)
async def resume_from_last_checkpoint(self):
"""Resume workflow from last saved state"""
completed_steps = [
step for step, status in self.state.items()
if status.get("status") == "completed"
]
# Skip completed steps, start from next one
return await self.execute_remaining_steps(completed_steps)
async def step_with_timeout(agent_task, timeout_seconds=60):
try:
result = await asyncio.wait_for(
agent_task,
timeout=timeout_seconds
)
return result
except asyncio.TimeoutError:
# Timeout exceeded
await log_timeout(agent_task)
# Decide: retry, skip, or fail workflow
return await handle_timeout_policy(agent_task)
| Tool | Best For | Learning Curve | Cost |
|---|---|---|---|
| LangChain | Quick prototypes, research | Low | Free (OSS) |
| LangGraph | Complex state machines | Medium | Free + Cloud ($) |
| Temporal | Mission-critical workflows | High | Free + Cloud ($$) |
| Prefect | Data pipelines | Medium | Free + Cloud ($) |
| Custom (FastAPI + Celery) | Full control | High | Infrastructure only |
Recommendation: Start with LangChain for prototypes, migrate to LangGraph or Temporal for production.
How do I decide between sequential and parallel?
Ask: "Does Step B need Step A's output?"
What's a good max_iterations for loops?
Depends on task:
Always set a limit to prevent infinite loops.
Should every workflow have human-in-the-loop?
No. Only for:
Routine tasks should be fully automated.
How do I test workflows?
Bottom line: Agentic workflows combine sequential, parallel, conditional, loop, and human-in-the-loop patterns. Start simple (sequential chains), add complexity as needed. Production workflows require error recovery, state persistence, and observability. Real-world customer onboarding workflow: 4 hours → 8 minutes.
Next: Read our Error Handling guide for production-grade failure management.