Academy28 Jul 20249 min read

Error Handling and Reliability Patterns for Production AI Agents

Production-grade error handling for AI agents -retry strategies, circuit breakers, fallback mechanisms, timeout management, and graceful degradation patterns.

MB
Max Beech
Head of Content

TL;DR

  • AI agents fail constantly in production: API timeouts, rate limits, model errors, invalid outputs.
  • 5 critical patterns: Retry with exponential backoff, circuit breakers, fallback mechanisms, timeout management, graceful degradation.
  • Retry: 3-5 attempts with exponential backoff (1s, 2s, 4s, 8s, 16s).
  • Circuit breaker: After N consecutive failures, stop trying for X minutes (prevents cascading failures).
  • Fallbacks: Cheaper model, cached response, human escalation, or "service unavailable" message.
  • Monitoring: Track error rates, latency, retry counts, circuit breaker trips.
  • Real data: Proper error handling increased agent reliability from 87% to 99.2% (14× fewer failures).

Error Handling for Production AI Agents

Production reality: AI agents fail. A lot.

Common failure modes:

  • OpenAI API timeout (happens 2-5% of requests during peak hours)
  • Rate limit exceeded (429 errors)
  • Model returns invalid JSON
  • External API (Stripe, GitHub, etc.) is down
  • Network issues
  • Context window exceeded

Without error handling:

User: "Analyze this dataset"
Agent: [Calls OpenAI]
OpenAI: [Timeout after 30 seconds]
Agent: [Crashes]
User sees: "Error 500"

With error handling:

User: "Analyze this dataset"
Agent: [Calls OpenAI]
OpenAI: [Timeout]
Agent: [Retries with exponential backoff]
OpenAI: [Success on retry 2]
Agent: Returns analysis
User sees: Analysis (never knew there was a failure)

Pattern 1: Retry with Exponential Backoff

When: Transient failures (API timeouts, rate limits, network issues).

Strategy: Retry failed requests with increasing delays.

Implementation:

import time
import random
from typing import Callable, Any

async def retry_with_backoff(
    func: Callable,
    max_retries: int = 5,
    initial_delay: float = 1.0,
    max_delay: float = 60.0,
    exponential_base: float = 2.0,
    jitter: bool = True
) -> Any:
    """
    Retry function with exponential backoff.
    
    Delays: 1s, 2s, 4s, 8s, 16s (with jitter)
    """
    last_exception = None
    
    for attempt in range(max_retries):
        try:
            result = await func()
            return result
        
        except RetryableError as e:
            last_exception = e
            
            if attempt == max_retries - 1:
                # Last attempt failed, raise
                raise
            
            # Calculate delay
            delay = min(
                initial_delay * (exponential_base ** attempt),
                max_delay
            )
            
            # Add jitter (randomness) to prevent thundering herd
            if jitter:
                delay = delay * (0.5 + random.random())
            
            print(f"Attempt {attempt + 1} failed: {e}. Retrying in {delay:.2f}s...")
            await asyncio.sleep(delay)
    
    raise last_exception

# Usage
async def call_openai_with_retry():
    return await retry_with_backoff(
        lambda: openai.ChatCompletion.create(
            model="gpt-4-turbo",
            messages=[{"role": "user", "content": "Analyze this data"}]
        ),
        max_retries=5
    )

Error Classification:

class RetryableError(Exception):
    """Errors that should be retried"""
    pass

class PermanentError(Exception):
    """Errors that shouldn't be retried"""
    pass

def classify_error(error):
    """Determine if error is retryable"""
    
    # Retryable errors
    if isinstance(error, (TimeoutError, ConnectionError)):
        return RetryableError(error)
    
    if hasattr(error, 'status_code'):
        # 429 = Rate limit (retry with backoff)
        # 500-599 = Server errors (retry)
        if error.status_code in [429, 500, 502, 503, 504]:
            return RetryableError(error)
        
        # 400-499 = Client errors (don't retry)
        if 400 <= error.status_code < 500:
            return PermanentError(error)
    
    # Default: Don't retry
    return PermanentError(error)

Why Jitter Matters

Without jitter: If 100 clients all retry at exactly 1s, 2s, 4s intervals → synchronized thundering herd hits API.

With jitter: Retries spread randomly over time window, reducing load spikes.

Example:

Without jitter (10 clients):
t=1s: |||||||||| (all 10 retry at once)
t=2s: |||||||||| (all 10 retry at once)

With jitter (10 clients):
t=0.8s: ||
t=1.1s: |||
t=1.3s: ||
t=1.7s: |||
(Spread evenly, no spike)

Pattern 2: Circuit Breaker

When: Prevent cascading failures when downstream service is down.

Problem: If external API is down, retrying 1000× just makes things worse (wastes resources, delays failure detection).

Solution: After N consecutive failures, "open circuit" (stop trying) for X minutes. Then try again.

States:

  1. Closed (normal): Requests go through
  2. Open (broken): All requests fail immediately (no retries)
  3. Half-Open (testing): Try one request to see if service recovered

Implementation:

from enum import Enum
from datetime import datetime, timedelta

class CircuitState(Enum):
    CLOSED = "closed"
    OPEN = "open"
    HALF_OPEN = "half_open"

class CircuitBreaker:
    def __init__(
        self,
        failure_threshold: int = 5,
        recovery_timeout: int = 60,
        expected_exception: type = Exception
    ):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.expected_exception = expected_exception
        
        self.failure_count = 0
        self.last_failure_time = None
        self.state = CircuitState.CLOSED
    
    async def call(self, func):
        if self.state == CircuitState.OPEN:
            # Check if recovery timeout has passed
            if datetime.now() - self.last_failure_time > timedelta(seconds=self.recovery_timeout):
                self.state = CircuitState.HALF_OPEN
            else:
                raise CircuitBreakerOpen(f"Circuit open, retry after {self.recovery_timeout}s")
        
        try:
            result = await func()
            
            # Success: Reset circuit
            if self.state == CircuitState.HALF_OPEN:
                self.state = CircuitState.CLOSED
                self.failure_count = 0
            
            return result
        
        except self.expected_exception as e:
            self.failure_count += 1
            self.last_failure_time = datetime.now()
            
            # Open circuit if threshold exceeded
            if self.failure_count >= self.failure_threshold:
                self.state = CircuitState.OPEN
                print(f"Circuit breaker opened after {self.failure_count} failures")
            
            raise

# Usage
openai_circuit = CircuitBreaker(
    failure_threshold=5,        # Open after 5 failures
    recovery_timeout=60,        # Wait 60s before retrying
    expected_exception=APIError
)

async def call_openai_protected():
    return await openai_circuit.call(
        lambda: openai.ChatCompletion.create(...)
    )

Real Example:

13:00: API call → Success (circuit: CLOSED)
13:01: API call → Success (circuit: CLOSED)
13:02: API call → Timeout (failure count: 1)
13:02: API call → Timeout (failure count: 2)
13:02: API call → Timeout (failure count: 3)
13:03: API call → Timeout (failure count: 4)
13:03: API call → Timeout (failure count: 5)
13:03: Circuit OPENS (stops trying)
13:03-13:04: All calls fail immediately with "Circuit open"
13:04: Circuit enters HALF_OPEN (tries one request)
13:04: API call → Success → Circuit CLOSES
13:04: All calls work normally again

Benefit: Prevents wasting time on doomed requests, allows service to recover.

Pattern 3: Fallback Mechanisms

When: Primary path fails, use alternative.

Fallback 1: Cheaper Model

async def call_with_model_fallback(prompt, max_retries=2):
    models = [
        ("gpt-4-turbo", 0.01),      # Primary: Best quality
        ("gpt-3.5-turbo", 0.002),   # Fallback 1: Cheaper
        ("claude-3-haiku", 0.001)   # Fallback 2: Cheapest
    ]
    
    for model_name, cost_per_token in models:
        try:
            response = await retry_with_backoff(
                lambda: call_llm(model_name, prompt),
                max_retries=max_retries
            )
            return response
        
        except Exception as e:
            print(f"{model_name} failed: {e}. Trying next model...")
            continue
    
    raise AllModelsFailed("All models failed")

Fallback 2: Cached Response

async def call_with_cache_fallback(prompt):
    cache_key = hash_prompt(prompt)
    
    try:
        # Try live API call
        response = await call_llm(prompt)
        
        # Cache successful response
        cache.set(cache_key, response, ttl=3600)
        return response
    
    except Exception as e:
        # API failed, check cache
        cached_response = cache.get(cache_key)
        
        if cached_response:
            print(f"API failed, returning cached response from {cached_response['cached_at']}")
            return cached_response
        
        raise  # No cache available, re-raise error

Fallback 3: Human Escalation

async def call_with_human_fallback(task, max_auto_retries=3):
    try:
        return await retry_with_backoff(
            lambda: agent.execute(task),
            max_retries=max_auto_retries
        )
    
    except Exception as e:
        # Agent failed, escalate to human
        ticket_id = create_support_ticket(
            title=f"Agent failed: {task['type']}",
            description=f"Error: {e}\nTask: {task}",
            priority="high"
        )
        
        await notify_on_call_human(ticket_id)
        
        return {
            "status": "escalated_to_human",
            "ticket_id": ticket_id,
            "message": "An engineer has been notified and will handle this manually."
        }

Pattern 4: Timeout Management

Problem: Agent waits forever for slow API response.

Solution: Set timeouts at multiple levels.

import asyncio

async def call_with_timeout(func, timeout_seconds=30):
    try:
        return await asyncio.wait_for(func(), timeout=timeout_seconds)
    
    except asyncio.TimeoutError:
        raise TimeoutError(f"Operation exceeded {timeout_seconds}s timeout")

# Multi-level timeouts
async def agent_workflow():
    # Level 1: Individual LLM call (30s timeout)
    llm_response = await call_with_timeout(
        lambda: call_llm(prompt),
        timeout_seconds=30
    )
    
    # Level 2: External API call (10s timeout)
    api_data = await call_with_timeout(
        lambda: fetch_external_api(),
        timeout_seconds=10
    )
    
    # Level 3: Entire workflow (5 minute timeout)
    return api_data

# Enforce workflow-level timeout
result = await call_with_timeout(
    agent_workflow,
    timeout_seconds=300
)

Timeout Values:

OperationTimeoutRationale
LLM API call30-60sOpenAI/Anthropic typically respond in 2-10s, but can spike to 30s
External API10sMost APIs respond <1s, 10s is generous
Database query5sShould be fast, >5s indicates problem
Entire workflow5-10minPrevents infinite hangs

Pattern 5: Graceful Degradation

When: Can't provide full functionality, provide partial functionality.

Example: E-commerce recommendation agent

Full functionality: Personalized recommendations based on user history + current trends + inventory

Degraded functionality:

  1. User history unavailable → Use only trends + inventory
  2. Trends API down → Use only user history + inventory
  3. Both down → Generic bestsellers from inventory
  4. All services down → Static curated list

Implementation:

async def get_recommendations(user_id):
    recommendations = []
    
    # Try personalized (best)
    try:
        user_history = await call_with_timeout(
            lambda: fetch_user_history(user_id),
            timeout_seconds=5
        )
        recommendations.extend(
            await generate_personalized(user_history)
        )
    except Exception:
        print("Personalization failed, degrading...")
    
    # Try trending (good)
    try:
        trends = await call_with_timeout(
            lambda: fetch_trending_items(),
            timeout_seconds=5
        )
        recommendations.extend(trends[:10])
    except Exception:
        print("Trends failed, degrading further...")
    
    # Fallback to bestsellers (okay)
    if not recommendations:
        try:
            bestsellers = await fetch_bestsellers()
            recommendations.extend(bestsellers[:10])
        except Exception:
            print("Bestsellers failed, using static fallback...")
    
    # Last resort: Static curated list (minimal)
    if not recommendations:
        recommendations = STATIC_CURATED_LIST
    
    return recommendations

User experience:

  • Full service: Excellent (personalized)
  • Partial failure: Good (trending items)
  • Major failure: Acceptable (bestsellers)
  • Complete failure: Usable (static list)

Better than: Complete failure with "Error 500" message.

Error Monitoring and Alerting

Track Error Rates

from prometheus_client import Counter, Histogram

# Metrics
errors_total = Counter('agent_errors_total', 'Total errors', ['error_type', 'agent_name'])
retry_count = Counter('agent_retries_total', 'Total retries', ['agent_name'])
latency = Histogram('agent_latency_seconds', 'Request latency', ['agent_name'])

async def monitored_agent_call(agent_name, task):
    start_time = time.time()
    retry_attempts = 0
    
    try:
        result = await retry_with_backoff(
            lambda: agent.execute(task),
            max_retries=5
        )
        
        # Record success metrics
        latency.labels(agent_name=agent_name).observe(time.time() - start_time)
        
        return result
    
    except Exception as e:
        # Record error metrics
        error_type = type(e).__name__
        errors_total.labels(error_type=error_type, agent_name=agent_name).inc()
        
        raise
    
    finally:
        retry_count.labels(agent_name=agent_name).inc(retry_attempts)

Alert Thresholds

alerts:
  - name: HighErrorRate
    condition: error_rate > 0.05  # 5% error rate
    duration: 5m
    action: page_oncall_engineer
    
  - name: CircuitBreakerOpen
    condition: circuit_breaker_state == "open"
    duration: 1m
    action: send_slack_alert
    
  - name: HighLatency
    condition: p95_latency > 60s
    duration: 10m
    action: send_slack_alert

Production Checklist

Before deploying agent to production:

  • Retry logic with exponential backoff for all external calls
  • Circuit breakers for critical dependencies
  • Timeouts at operation, workflow, and system levels
  • Fallback mechanisms for degraded functionality
  • Error classification (retryable vs permanent)
  • Monitoring error rates, latency, retry counts
  • Alerting for high error rates, circuit breaker trips
  • Logging all errors with context (user ID, task, timestamp)
  • Dead letter queue for failed tasks (manual review)
  • Graceful degradation paths defined

Frequently Asked Questions

How many retries should I configure?

Recommendation: 3-5 retries for most cases.

  • Too few (1-2): Transient failures cause user-visible errors
  • Too many (10+): Wastes time on permanent failures

Exception: Critical operations (payments, data loss) may warrant 10+ retries.

Should I retry on all errors?

No. Only retry transient errors:

  • ✅ Timeout, rate limit, 5xx server errors
  • ❌ Authentication failure, invalid input, 4xx client errors

How long should circuit breaker stay open?

Standard: 60 seconds.

  • Too short (5s): Circuit closes before service recovers, reopens immediately
  • Too long (10min): Users wait unnecessarily long after service recovers

Tune based on monitoring: If circuit reopens frequently, increase timeout.

What's the performance cost of error handling?

Retry overhead: Adds latency only when failures occur (0% overhead in happy path).

Circuit breaker overhead: ~1ms per call (negligible).

Monitoring overhead: ~5-10ms per call (acceptable for production observability).


Bottom line: Production AI agents require robust error handling. Implement retry with exponential backoff, circuit breakers, timeouts, fallbacks, and graceful degradation. Proper error handling increases reliability from 87% to 99.2%. Monitor error rates and set alerts for anomalies.

Next: Read our Agent Observability guide for comprehensive monitoring strategies.