How to Implement Human-in-the-Loop Approval Workflows for AI Agents
Production patterns for human approval workflows in AI agents -when to require approval, database schema, real-time notifications, and security implementation.
Production patterns for human approval workflows in AI agents -when to require approval, database schema, real-time notifications, and security implementation.
TL;DR
Without approval workflow:
Agent: "I'll delete these 15,000 old customer records to free up database space."
[Executes deletion]
User: "Wait, we needed those for the audit! 😱"
With approval workflow:
Agent: "I recommend deleting 15,000 old customer records. Requesting approval..."
User: [Reviews] "No, we need those for Q4 audit."
Agent: "Understood. Operation cancelled."
Human-in-the-loop approval ensures agents don't execute irreversible or high-risk actions without explicit human consent.
| Action Type | Risk | Approval Required? |
|---|---|---|
| Delete data | Data loss | ✅ Always |
| Financial transaction >£100 | Financial risk | ✅ Always |
| Send email to customers | Reputation risk | ✅ Always |
| Modify production config | System stability | ✅ Always |
| Query database (read-only) | Low risk | ❌ No |
| Generate report | Low risk | ❌ No |
| Internal analysis | Low risk | ❌ No |
| Create calendar event | Medium risk | ⚠️ Optional |
Principle: Require approval for actions that are:
class RiskLevel:
LOW = "low" # Auto-approve
MEDIUM = "medium" # Require approval
HIGH = "high" # Require approval + manager sign-off
CRITICAL = "critical" # Require approval + 2-person rule
def classify_action_risk(action, params):
if action == "delete_data":
record_count = params.get("count", 0)
if record_count > 1000:
return RiskLevel.HIGH # Mass deletion
return RiskLevel.MEDIUM
if action == "send_email":
recipient_count = len(params.get("recipients", []))
if recipient_count > 100:
return RiskLevel.HIGH # Bulk email
if params.get("external", False):
return RiskLevel.MEDIUM # External email
return RiskLevel.LOW # Internal email
if action == "charge_payment":
amount = params.get("amount", 0)
if amount > 1000:
return RiskLevel.CRITICAL # Large transaction
if amount > 100:
return RiskLevel.HIGH
return RiskLevel.MEDIUM
return RiskLevel.LOW
1. Agent identifies action to execute
2. Check if action requires approval
3. If yes:
a. Pause agent execution
b. Create approval request in database
c. Notify user (email, push, webhook)
d. Wait for user response
4. User reviews and responds (approve/reject/modify)
5. Agent resumes:
- If approved: Execute action
- If rejected: Cancel and explain
- If modified: Execute with changes
6. Log outcome to audit trail
-- Approval requests
CREATE TABLE approval_requests (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
org_id TEXT NOT NULL,
agent_id TEXT NOT NULL,
action_type TEXT NOT NULL, -- 'delete_data', 'send_email', etc.
action_params JSONB NOT NULL,
risk_level TEXT NOT NULL, -- 'low', 'medium', 'high', 'critical'
status TEXT NOT NULL DEFAULT 'pending', -- 'pending', 'approved', 'rejected', 'expired'
justification TEXT, -- Agent's reason for action
requested_by UUID REFERENCES users(id),
requested_at TIMESTAMP DEFAULT NOW(),
responded_by UUID REFERENCES users(id),
responded_at TIMESTAMP,
response_note TEXT, -- User's reason for approval/rejection
expires_at TIMESTAMP, -- Auto-reject if not responded by this time
executed_at TIMESTAMP
);
-- Approval rules (define which actions need approval)
CREATE TABLE approval_rules (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
org_id TEXT NOT NULL,
action_type TEXT NOT NULL,
risk_level TEXT NOT NULL,
requires_approval BOOLEAN DEFAULT TRUE,
approver_role TEXT, -- 'admin', 'manager', 'any_user'
timeout_hours INTEGER DEFAULT 24,
auto_reject_on_timeout BOOLEAN DEFAULT TRUE
);
-- Audit log
CREATE TABLE approval_audit (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
approval_request_id UUID REFERENCES approval_requests(id),
event_type TEXT NOT NULL, -- 'created', 'approved', 'rejected', 'expired', 'executed'
event_data JSONB,
created_at TIMESTAMP DEFAULT NOW()
);
import asyncio
from datetime import datetime, timedelta
class ApprovalWorkflow:
def __init__(self, db, notification_service):
self.db = db
self.notifier = notification_service
async def request_approval(
self,
org_id: str,
agent_id: str,
action_type: str,
action_params: dict,
justification: str,
requested_by: str
) -> dict:
# Classify risk
risk_level = self.classify_risk(action_type, action_params)
# Check if approval required
rule = self.db.get_approval_rule(org_id, action_type)
if not rule or not rule["requires_approval"]:
return {"requires_approval": False}
# Create approval request
expires_at = datetime.now() + timedelta(hours=rule["timeout_hours"])
approval_id = self.db.insert("approval_requests", {
"org_id": org_id,
"agent_id": agent_id,
"action_type": action_type,
"action_params": action_params,
"risk_level": risk_level,
"justification": justification,
"requested_by": requested_by,
"expires_at": expires_at
})
# Notify user
await self.notifier.send_approval_request(
approval_id=approval_id,
org_id=org_id,
action_type=action_type,
justification=justification
)
# Audit log
self.db.insert("approval_audit", {
"approval_request_id": approval_id,
"event_type": "created",
"event_data": {"action_type": action_type}
})
return {
"requires_approval": True,
"approval_id": approval_id,
"expires_at": expires_at
}
def classify_risk(self, action_type, params):
# Risk classification logic (see earlier example)
if action_type == "delete_data" and params.get("count", 0) > 1000:
return "high"
if action_type == "charge_payment" and params.get("amount", 0) > 1000:
return "critical"
return "medium"
async def wait_for_approval(self, approval_id: str, poll_interval: int = 5) -> dict:
"""
Poll database for approval response.
Returns when user approves/rejects or request expires.
"""
while True:
request = self.db.get("approval_requests", approval_id)
# Check if responded
if request["status"] in ["approved", "rejected"]:
return {
"status": request["status"],
"response_note": request["response_note"],
"responded_by": request["responded_by"]
}
# Check if expired
if datetime.now() > request["expires_at"]:
self.db.update("approval_requests", approval_id, {
"status": "expired"
})
self.db.insert("approval_audit", {
"approval_request_id": approval_id,
"event_type": "expired"
})
return {"status": "expired"}
# Wait and poll again
await asyncio.sleep(poll_interval)
def respond_to_approval(
self,
approval_id: str,
user_id: str,
decision: str, # 'approve' or 'reject'
note: str = None
) -> dict:
"""
User approves or rejects pending approval request.
"""
# Get request
request = self.db.get("approval_requests", approval_id)
if request["status"] != "pending":
raise ValueError(f"Request already {request['status']}")
# Check user has permission
rule = self.db.get_approval_rule(
request["org_id"],
request["action_type"]
)
if not self.user_can_approve(user_id, rule["approver_role"]):
raise PermissionError("User not authorized to approve this action")
# Update request
self.db.update("approval_requests", approval_id, {
"status": "approved" if decision == "approve" else "rejected",
"responded_by": user_id,
"responded_at": datetime.now(),
"response_note": note
})
# Audit log
self.db.insert("approval_audit", {
"approval_request_id": approval_id,
"event_type": decision + "d", # 'approved' or 'rejected'
"event_data": {"user_id": user_id, "note": note}
})
# Notify agent (via webhook, message queue, etc.)
self.notifier.notify_agent(approval_id, decision)
return {"status": decision + "d"}
async def execute_with_approval(
self,
action_type: str,
action_params: dict,
agent_id: str,
org_id: str,
user_id: str,
justification: str
) -> dict:
"""
Execute action with approval workflow.
Pauses if approval required, executes immediately otherwise.
"""
# Request approval
approval = await self.request_approval(
org_id=org_id,
agent_id=agent_id,
action_type=action_type,
action_params=action_params,
justification=justification,
requested_by=user_id
)
# If no approval required, execute immediately
if not approval["requires_approval"]:
return await self.execute_action(action_type, action_params)
# Wait for approval
response = await self.wait_for_approval(approval["approval_id"])
if response["status"] == "approved":
# Execute action
result = await self.execute_action(action_type, action_params)
# Mark as executed
self.db.update("approval_requests", approval["approval_id"], {
"executed_at": datetime.now()
})
self.db.insert("approval_audit", {
"approval_request_id": approval["approval_id"],
"event_type": "executed",
"event_data": result
})
return {"status": "executed", "result": result}
elif response["status"] == "rejected":
return {
"status": "rejected",
"reason": response["response_note"]
}
else: # expired
return {"status": "expired", "message": "Approval request timed out"}
async def execute_action(self, action_type: str, params: dict) -> dict:
"""Execute the actual action (delete data, send email, etc.)"""
if action_type == "delete_data":
return await delete_records(params["table"], params["ids"])
elif action_type == "send_email":
return await send_email(params["to"], params["subject"], params["body"])
# ... other actions
# Server-side: Notify user via WebSocket
async def send_approval_request(self, approval_id, org_id, action_type, justification):
# Get users who can approve
approvers = self.db.get_approvers(org_id)
for approver in approvers:
# Send real-time notification
await self.websocket_manager.send_to_user(approver["id"], {
"type": "approval_request",
"approval_id": approval_id,
"action_type": action_type,
"justification": justification,
"expires_at": expires_at.isoformat()
})
# Also send email backup
await self.email_service.send(
to=approver["email"],
subject=f"Approval Required: {action_type}",
body=f"An AI agent is requesting approval to {action_type}.\n\n"
f"Reason: {justification}\n\n"
f"Review: https://app.company.com/approvals/{approval_id}"
)
// React component for approval requests
function ApprovalInbox() {
const [requests, setRequests] = useState([]);
useEffect(() => {
// Subscribe to real-time updates
const subscription = supabase
.channel('approvals')
.on('postgres_changes', {
event: 'INSERT',
schema: 'public',
table: 'approval_requests',
filter: `org_id=eq.${orgId}`
}, (payload) => {
setRequests(prev => [payload.new, ...prev]);
// Show browser notification
new Notification("Approval Required", {
body: `Agent requests permission to ${payload.new.action_type}`
});
})
.subscribe();
return () => subscription.unsubscribe();
}, []);
const handleApprove = async (approvalId) => {
await fetch(`/api/approvals/${approvalId}/approve`, {
method: 'POST',
body: JSON.stringify({ note: "Approved" })
});
};
return (
<div>
{requests.map(req => (
<ApprovalCard
key={req.id}
request={req}
onApprove={() => handleApprove(req.id)}
onReject={() => handleReject(req.id)}
/>
))}
</div>
);
}
For critical actions (large payments, production deployments), require two approvers.
class TwoPersonApproval:
async def request_approval(self, action, params):
approval_id = self.db.insert("approval_requests", {
"action": action,
"params": params,
"required_approvals": 2, # Need 2 approvals
"approvals_received": 0
})
return approval_id
def approve(self, approval_id, user_id):
request = self.db.get("approval_requests", approval_id)
# Check user hasn't already approved
approvers = request.get("approvers", [])
if user_id in approvers:
raise ValueError("User already approved")
approvers.append(user_id)
approvals_count = len(approvers)
self.db.update("approval_requests", approval_id, {
"approvers": approvers,
"approvals_received": approvals_count,
"status": "approved" if approvals_count >= 2 else "pending"
})
return approvals_count >= request["required_approvals"]
If primary approver doesn't respond, escalate to manager.
async def handle_timeout_escalation(self):
"""Background job to check for expired approvals and escalate"""
while True:
# Find approvals expiring soon (within 1 hour)
expiring = self.db.query("""
SELECT * FROM approval_requests
WHERE status = 'pending'
AND expires_at < NOW() + INTERVAL '1 hour'
AND escalated = FALSE
""")
for request in expiring:
# Escalate to manager
manager = self.db.get_user_manager(request["requested_by"])
await self.notifier.send_urgent_notification(
user_id=manager["id"],
message=f"Urgent: Approval needed (expires in 1 hour)",
approval_id=request["id"]
)
self.db.update("approval_requests", request["id"], {
"escalated": True
})
await asyncio.sleep(300) # Check every 5 minutes
For trusted users or low-risk scenarios, auto-approve with audit trail.
def check_auto_approval(self, user_id, action_type, params):
"""
Auto-approve if user has sufficient trust level
"""
user = self.db.get_user(user_id)
# Check user trust score (based on history, role, tenure)
if user["trust_score"] >= 90:
if action_type == "delete_data" and params.get("count", 0) < 100:
# Auto-approve small deletions for trusted users
self.db.insert("approval_audit", {
"user_id": user_id,
"action": action_type,
"auto_approved": True,
"reason": f"User trust score {user['trust_score']}"
})
return True
return False
1. Authorization checks: Verify user has permission to approve before processing response.
2. Replay attack prevention: Approval response should be one-time use, can't be replayed.
def approve(self, approval_id, user_id, csrf_token):
# Verify CSRF token
if not self.verify_csrf(csrf_token, user_id):
raise SecurityError("Invalid CSRF token")
# Check approval still pending (not already used)
request = self.db.get("approval_requests", approval_id)
if request["status"] != "pending":
raise ValueError("Approval already processed")
3. Audit logging: Log every approval event with timestamp, user ID, IP address.
4. Encryption: Encrypt sensitive approval data (e.g., payment details) at rest.
What if the user never responds?
Set a timeout (expires_at). After timeout, either:
How do I handle urgent requests?
For time-sensitive actions:
Can agents modify their own approval requests?
No. Agent creates request, then waits passively. Only humans can approve/reject. Prevents agent from approving its own actions.
What if approval requirements change mid-execution?
Re-check approval rules before execution:
# Check rules again at execution time
current_rule = self.db.get_approval_rule(org_id, action_type)
if current_rule["requires_approval"] and not is_approved:
raise SecurityError("Approval required (rules changed)")
Bottom line: Human-in-the-loop approval workflows prevent catastrophic agent mistakes. Implement for all high-risk actions (deletion, payments, external comms). Use real-time notifications, timeout handling, and comprehensive audit trails. 84% of enterprise AI deployments use HITL for compliance.
Next: Read our Error Handling guide for handling failures gracefully.