Academy30 Sept 202410 min read

How to Implement Human-in-the-Loop Approval Workflows for AI Agents

Production patterns for human approval workflows in AI agents -when to require approval, database schema, real-time notifications, and security implementation.

MB
Max Beech
Head of Content

TL;DR

  • Human-in-the-loop (HITL) approval prevents AI agents from executing high-risk actions autonomously.
  • Require approval for: Financial transactions, data deletion, external communications, system configuration changes.
  • Implementation: Agent pauses → Creates approval request → Notifies user → Waits for response → Executes or cancels.
  • Database schema: Store approval requests, rules, audit trail with timestamps and justifications.
  • Real-time notifications: Use WebSockets, SSE, or email to alert users immediately.
  • Timeout handling: Auto-reject after N hours, or escalate to fallback approver.
  • Production pattern: 84% of enterprise AI deployments require HITL for compliance.

Human-in-the-Loop Approval Workflows for AI Agents

Without approval workflow:

Agent: "I'll delete these 15,000 old customer records to free up database space."
[Executes deletion]
User: "Wait, we needed those for the audit! 😱"

With approval workflow:

Agent: "I recommend deleting 15,000 old customer records. Requesting approval..."
User: [Reviews] "No, we need those for Q4 audit."
Agent: "Understood. Operation cancelled."

Human-in-the-loop approval ensures agents don't execute irreversible or high-risk actions without explicit human consent.

When to Require Approval

High-Risk Actions

Action TypeRiskApproval Required?
Delete dataData loss✅ Always
Financial transaction >£100Financial risk✅ Always
Send email to customersReputation risk✅ Always
Modify production configSystem stability✅ Always
Query database (read-only)Low risk❌ No
Generate reportLow risk❌ No
Internal analysisLow risk❌ No
Create calendar eventMedium risk⚠️ Optional

Principle: Require approval for actions that are:

  1. Irreversible (can't undo)
  2. High-cost (financial or reputational)
  3. External-facing (affects customers, partners)
  4. Compliance-sensitive (regulated data, audit trails)

Risk-Based Tiering

class RiskLevel:
    LOW = "low"           # Auto-approve
    MEDIUM = "medium"     # Require approval
    HIGH = "high"         # Require approval + manager sign-off
    CRITICAL = "critical" # Require approval + 2-person rule

def classify_action_risk(action, params):
    if action == "delete_data":
        record_count = params.get("count", 0)
        if record_count > 1000:
            return RiskLevel.HIGH  # Mass deletion
        return RiskLevel.MEDIUM
    
    if action == "send_email":
        recipient_count = len(params.get("recipients", []))
        if recipient_count > 100:
            return RiskLevel.HIGH  # Bulk email
        if params.get("external", False):
            return RiskLevel.MEDIUM  # External email
        return RiskLevel.LOW  # Internal email
    
    if action == "charge_payment":
        amount = params.get("amount", 0)
        if amount > 1000:
            return RiskLevel.CRITICAL  # Large transaction
        if amount > 100:
            return RiskLevel.HIGH
        return RiskLevel.MEDIUM
    
    return RiskLevel.LOW

Architecture Pattern

Basic Flow

1. Agent identifies action to execute
2. Check if action requires approval
3. If yes:
   a. Pause agent execution
   b. Create approval request in database
   c. Notify user (email, push, webhook)
   d. Wait for user response
4. User reviews and responds (approve/reject/modify)
5. Agent resumes:
   - If approved: Execute action
   - If rejected: Cancel and explain
   - If modified: Execute with changes
6. Log outcome to audit trail

Database Schema

-- Approval requests
CREATE TABLE approval_requests (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    org_id TEXT NOT NULL,
    agent_id TEXT NOT NULL,
    action_type TEXT NOT NULL,  -- 'delete_data', 'send_email', etc.
    action_params JSONB NOT NULL,
    risk_level TEXT NOT NULL,   -- 'low', 'medium', 'high', 'critical'
    status TEXT NOT NULL DEFAULT 'pending',  -- 'pending', 'approved', 'rejected', 'expired'
    justification TEXT,         -- Agent's reason for action
    requested_by UUID REFERENCES users(id),
    requested_at TIMESTAMP DEFAULT NOW(),
    responded_by UUID REFERENCES users(id),
    responded_at TIMESTAMP,
    response_note TEXT,         -- User's reason for approval/rejection
    expires_at TIMESTAMP,       -- Auto-reject if not responded by this time
    executed_at TIMESTAMP
);

-- Approval rules (define which actions need approval)
CREATE TABLE approval_rules (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    org_id TEXT NOT NULL,
    action_type TEXT NOT NULL,
    risk_level TEXT NOT NULL,
    requires_approval BOOLEAN DEFAULT TRUE,
    approver_role TEXT,         -- 'admin', 'manager', 'any_user'
    timeout_hours INTEGER DEFAULT 24,
    auto_reject_on_timeout BOOLEAN DEFAULT TRUE
);

-- Audit log
CREATE TABLE approval_audit (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    approval_request_id UUID REFERENCES approval_requests(id),
    event_type TEXT NOT NULL,  -- 'created', 'approved', 'rejected', 'expired', 'executed'
    event_data JSONB,
    created_at TIMESTAMP DEFAULT NOW()
);

Implementation: Python Example

1. Request Approval

import asyncio
from datetime import datetime, timedelta

class ApprovalWorkflow:
    def __init__(self, db, notification_service):
        self.db = db
        self.notifier = notification_service
    
    async def request_approval(
        self,
        org_id: str,
        agent_id: str,
        action_type: str,
        action_params: dict,
        justification: str,
        requested_by: str
    ) -> dict:
        # Classify risk
        risk_level = self.classify_risk(action_type, action_params)
        
        # Check if approval required
        rule = self.db.get_approval_rule(org_id, action_type)
        if not rule or not rule["requires_approval"]:
            return {"requires_approval": False}
        
        # Create approval request
        expires_at = datetime.now() + timedelta(hours=rule["timeout_hours"])
        
        approval_id = self.db.insert("approval_requests", {
            "org_id": org_id,
            "agent_id": agent_id,
            "action_type": action_type,
            "action_params": action_params,
            "risk_level": risk_level,
            "justification": justification,
            "requested_by": requested_by,
            "expires_at": expires_at
        })
        
        # Notify user
        await self.notifier.send_approval_request(
            approval_id=approval_id,
            org_id=org_id,
            action_type=action_type,
            justification=justification
        )
        
        # Audit log
        self.db.insert("approval_audit", {
            "approval_request_id": approval_id,
            "event_type": "created",
            "event_data": {"action_type": action_type}
        })
        
        return {
            "requires_approval": True,
            "approval_id": approval_id,
            "expires_at": expires_at
        }
    
    def classify_risk(self, action_type, params):
        # Risk classification logic (see earlier example)
        if action_type == "delete_data" and params.get("count", 0) > 1000:
            return "high"
        if action_type == "charge_payment" and params.get("amount", 0) > 1000:
            return "critical"
        return "medium"

2. Wait for Response

async def wait_for_approval(self, approval_id: str, poll_interval: int = 5) -> dict:
    """
    Poll database for approval response.
    Returns when user approves/rejects or request expires.
    """
    while True:
        request = self.db.get("approval_requests", approval_id)
        
        # Check if responded
        if request["status"] in ["approved", "rejected"]:
            return {
                "status": request["status"],
                "response_note": request["response_note"],
                "responded_by": request["responded_by"]
            }
        
        # Check if expired
        if datetime.now() > request["expires_at"]:
            self.db.update("approval_requests", approval_id, {
                "status": "expired"
            })
            
            self.db.insert("approval_audit", {
                "approval_request_id": approval_id,
                "event_type": "expired"
            })
            
            return {"status": "expired"}
        
        # Wait and poll again
        await asyncio.sleep(poll_interval)

3. User Response Handler

def respond_to_approval(
    self,
    approval_id: str,
    user_id: str,
    decision: str,  # 'approve' or 'reject'
    note: str = None
) -> dict:
    """
    User approves or rejects pending approval request.
    """
    # Get request
    request = self.db.get("approval_requests", approval_id)
    
    if request["status"] != "pending":
        raise ValueError(f"Request already {request['status']}")
    
    # Check user has permission
    rule = self.db.get_approval_rule(
        request["org_id"], 
        request["action_type"]
    )
    
    if not self.user_can_approve(user_id, rule["approver_role"]):
        raise PermissionError("User not authorized to approve this action")
    
    # Update request
    self.db.update("approval_requests", approval_id, {
        "status": "approved" if decision == "approve" else "rejected",
        "responded_by": user_id,
        "responded_at": datetime.now(),
        "response_note": note
    })
    
    # Audit log
    self.db.insert("approval_audit", {
        "approval_request_id": approval_id,
        "event_type": decision + "d",  # 'approved' or 'rejected'
        "event_data": {"user_id": user_id, "note": note}
    })
    
    # Notify agent (via webhook, message queue, etc.)
    self.notifier.notify_agent(approval_id, decision)
    
    return {"status": decision + "d"}

4. Execute with Approval

async def execute_with_approval(
    self,
    action_type: str,
    action_params: dict,
    agent_id: str,
    org_id: str,
    user_id: str,
    justification: str
) -> dict:
    """
    Execute action with approval workflow.
    Pauses if approval required, executes immediately otherwise.
    """
    # Request approval
    approval = await self.request_approval(
        org_id=org_id,
        agent_id=agent_id,
        action_type=action_type,
        action_params=action_params,
        justification=justification,
        requested_by=user_id
    )
    
    # If no approval required, execute immediately
    if not approval["requires_approval"]:
        return await self.execute_action(action_type, action_params)
    
    # Wait for approval
    response = await self.wait_for_approval(approval["approval_id"])
    
    if response["status"] == "approved":
        # Execute action
        result = await self.execute_action(action_type, action_params)
        
        # Mark as executed
        self.db.update("approval_requests", approval["approval_id"], {
            "executed_at": datetime.now()
        })
        
        self.db.insert("approval_audit", {
            "approval_request_id": approval["approval_id"],
            "event_type": "executed",
            "event_data": result
        })
        
        return {"status": "executed", "result": result}
    
    elif response["status"] == "rejected":
        return {
            "status": "rejected",
            "reason": response["response_note"]
        }
    
    else:  # expired
        return {"status": "expired", "message": "Approval request timed out"}

async def execute_action(self, action_type: str, params: dict) -> dict:
    """Execute the actual action (delete data, send email, etc.)"""
    if action_type == "delete_data":
        return await delete_records(params["table"], params["ids"])
    elif action_type == "send_email":
        return await send_email(params["to"], params["subject"], params["body"])
    # ... other actions

Real-Time Notifications

WebSocket/SSE Pattern

# Server-side: Notify user via WebSocket
async def send_approval_request(self, approval_id, org_id, action_type, justification):
    # Get users who can approve
    approvers = self.db.get_approvers(org_id)
    
    for approver in approvers:
        # Send real-time notification
        await self.websocket_manager.send_to_user(approver["id"], {
            "type": "approval_request",
            "approval_id": approval_id,
            "action_type": action_type,
            "justification": justification,
            "expires_at": expires_at.isoformat()
        })
    
    # Also send email backup
    await self.email_service.send(
        to=approver["email"],
        subject=f"Approval Required: {action_type}",
        body=f"An AI agent is requesting approval to {action_type}.\n\n"
             f"Reason: {justification}\n\n"
             f"Review: https://app.company.com/approvals/{approval_id}"
    )

Client-Side (React)

// React component for approval requests
function ApprovalInbox() {
  const [requests, setRequests] = useState([]);
  
  useEffect(() => {
    // Subscribe to real-time updates
    const subscription = supabase
      .channel('approvals')
      .on('postgres_changes', {
        event: 'INSERT',
        schema: 'public',
        table: 'approval_requests',
        filter: `org_id=eq.${orgId}`
      }, (payload) => {
        setRequests(prev => [payload.new, ...prev]);
        
        // Show browser notification
        new Notification("Approval Required", {
          body: `Agent requests permission to ${payload.new.action_type}`
        });
      })
      .subscribe();
    
    return () => subscription.unsubscribe();
  }, []);
  
  const handleApprove = async (approvalId) => {
    await fetch(`/api/approvals/${approvalId}/approve`, {
      method: 'POST',
      body: JSON.stringify({ note: "Approved" })
    });
  };
  
  return (
    <div>
      {requests.map(req => (
        <ApprovalCard
          key={req.id}
          request={req}
          onApprove={() => handleApprove(req.id)}
          onReject={() => handleReject(req.id)}
        />
      ))}
    </div>
  );
}

Production Patterns

Pattern 1: Two-Person Rule (Critical Actions)

For critical actions (large payments, production deployments), require two approvers.

class TwoPersonApproval:
    async def request_approval(self, action, params):
        approval_id = self.db.insert("approval_requests", {
            "action": action,
            "params": params,
            "required_approvals": 2,  # Need 2 approvals
            "approvals_received": 0
        })
        
        return approval_id
    
    def approve(self, approval_id, user_id):
        request = self.db.get("approval_requests", approval_id)
        
        # Check user hasn't already approved
        approvers = request.get("approvers", [])
        if user_id in approvers:
            raise ValueError("User already approved")
        
        approvers.append(user_id)
        approvals_count = len(approvers)
        
        self.db.update("approval_requests", approval_id, {
            "approvers": approvers,
            "approvals_received": approvals_count,
            "status": "approved" if approvals_count >= 2 else "pending"
        })
        
        return approvals_count >= request["required_approvals"]

Pattern 2: Escalation on Timeout

If primary approver doesn't respond, escalate to manager.

async def handle_timeout_escalation(self):
    """Background job to check for expired approvals and escalate"""
    while True:
        # Find approvals expiring soon (within 1 hour)
        expiring = self.db.query("""
            SELECT * FROM approval_requests
            WHERE status = 'pending'
            AND expires_at < NOW() + INTERVAL '1 hour'
            AND escalated = FALSE
        """)
        
        for request in expiring:
            # Escalate to manager
            manager = self.db.get_user_manager(request["requested_by"])
            
            await self.notifier.send_urgent_notification(
                user_id=manager["id"],
                message=f"Urgent: Approval needed (expires in 1 hour)",
                approval_id=request["id"]
            )
            
            self.db.update("approval_requests", request["id"], {
                "escalated": True
            })
        
        await asyncio.sleep(300)  # Check every 5 minutes

Pattern 3: Conditional Auto-Approval

For trusted users or low-risk scenarios, auto-approve with audit trail.

def check_auto_approval(self, user_id, action_type, params):
    """
    Auto-approve if user has sufficient trust level
    """
    user = self.db.get_user(user_id)
    
    # Check user trust score (based on history, role, tenure)
    if user["trust_score"] >= 90:
        if action_type == "delete_data" and params.get("count", 0) < 100:
            # Auto-approve small deletions for trusted users
            self.db.insert("approval_audit", {
                "user_id": user_id,
                "action": action_type,
                "auto_approved": True,
                "reason": f"User trust score {user['trust_score']}"
            })
            return True
    
    return False

Security Considerations

1. Authorization checks: Verify user has permission to approve before processing response.

2. Replay attack prevention: Approval response should be one-time use, can't be replayed.

def approve(self, approval_id, user_id, csrf_token):
    # Verify CSRF token
    if not self.verify_csrf(csrf_token, user_id):
        raise SecurityError("Invalid CSRF token")
    
    # Check approval still pending (not already used)
    request = self.db.get("approval_requests", approval_id)
    if request["status"] != "pending":
        raise ValueError("Approval already processed")

3. Audit logging: Log every approval event with timestamp, user ID, IP address.

4. Encryption: Encrypt sensitive approval data (e.g., payment details) at rest.

Frequently Asked Questions

What if the user never responds?

Set a timeout (expires_at). After timeout, either:

  • Auto-reject (safe default)
  • Escalate to manager
  • Execute with elevated logging (if acceptable risk)

How do I handle urgent requests?

For time-sensitive actions:

  • Shorter timeout (2 hours instead of 24)
  • Multi-channel notification (email + SMS + push)
  • Escalate immediately to on-call person

Can agents modify their own approval requests?

No. Agent creates request, then waits passively. Only humans can approve/reject. Prevents agent from approving its own actions.

What if approval requirements change mid-execution?

Re-check approval rules before execution:

# Check rules again at execution time
current_rule = self.db.get_approval_rule(org_id, action_type)
if current_rule["requires_approval"] and not is_approved:
    raise SecurityError("Approval required (rules changed)")

Bottom line: Human-in-the-loop approval workflows prevent catastrophic agent mistakes. Implement for all high-risk actions (deletion, payments, external comms). Use real-time notifications, timeout handling, and comprehensive audit trails. 84% of enterprise AI deployments use HITL for compliance.

Next: Read our Error Handling guide for handling failures gracefully.