Error Handling and Reliability Patterns for Production AI Agents
Production-grade error handling for AI agents -retry strategies, circuit breakers, fallback mechanisms, timeout management, and graceful degradation patterns.
Production-grade error handling for AI agents -retry strategies, circuit breakers, fallback mechanisms, timeout management, and graceful degradation patterns.
TL;DR
Production reality: AI agents fail. A lot.
Common failure modes:
Without error handling:
User: "Analyze this dataset"
Agent: [Calls OpenAI]
OpenAI: [Timeout after 30 seconds]
Agent: [Crashes]
User sees: "Error 500"
With error handling:
User: "Analyze this dataset"
Agent: [Calls OpenAI]
OpenAI: [Timeout]
Agent: [Retries with exponential backoff]
OpenAI: [Success on retry 2]
Agent: Returns analysis
User sees: Analysis (never knew there was a failure)
When: Transient failures (API timeouts, rate limits, network issues).
Strategy: Retry failed requests with increasing delays.
Implementation:
import time
import random
from typing import Callable, Any
async def retry_with_backoff(
func: Callable,
max_retries: int = 5,
initial_delay: float = 1.0,
max_delay: float = 60.0,
exponential_base: float = 2.0,
jitter: bool = True
) -> Any:
"""
Retry function with exponential backoff.
Delays: 1s, 2s, 4s, 8s, 16s (with jitter)
"""
last_exception = None
for attempt in range(max_retries):
try:
result = await func()
return result
except RetryableError as e:
last_exception = e
if attempt == max_retries - 1:
# Last attempt failed, raise
raise
# Calculate delay
delay = min(
initial_delay * (exponential_base ** attempt),
max_delay
)
# Add jitter (randomness) to prevent thundering herd
if jitter:
delay = delay * (0.5 + random.random())
print(f"Attempt {attempt + 1} failed: {e}. Retrying in {delay:.2f}s...")
await asyncio.sleep(delay)
raise last_exception
# Usage
async def call_openai_with_retry():
return await retry_with_backoff(
lambda: openai.ChatCompletion.create(
model="gpt-4-turbo",
messages=[{"role": "user", "content": "Analyze this data"}]
),
max_retries=5
)
Error Classification:
class RetryableError(Exception):
"""Errors that should be retried"""
pass
class PermanentError(Exception):
"""Errors that shouldn't be retried"""
pass
def classify_error(error):
"""Determine if error is retryable"""
# Retryable errors
if isinstance(error, (TimeoutError, ConnectionError)):
return RetryableError(error)
if hasattr(error, 'status_code'):
# 429 = Rate limit (retry with backoff)
# 500-599 = Server errors (retry)
if error.status_code in [429, 500, 502, 503, 504]:
return RetryableError(error)
# 400-499 = Client errors (don't retry)
if 400 <= error.status_code < 500:
return PermanentError(error)
# Default: Don't retry
return PermanentError(error)
Without jitter: If 100 clients all retry at exactly 1s, 2s, 4s intervals → synchronized thundering herd hits API.
With jitter: Retries spread randomly over time window, reducing load spikes.
Example:
Without jitter (10 clients):
t=1s: |||||||||| (all 10 retry at once)
t=2s: |||||||||| (all 10 retry at once)
With jitter (10 clients):
t=0.8s: ||
t=1.1s: |||
t=1.3s: ||
t=1.7s: |||
(Spread evenly, no spike)
When: Prevent cascading failures when downstream service is down.
Problem: If external API is down, retrying 1000× just makes things worse (wastes resources, delays failure detection).
Solution: After N consecutive failures, "open circuit" (stop trying) for X minutes. Then try again.
States:
Implementation:
from enum import Enum
from datetime import datetime, timedelta
class CircuitState(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
class CircuitBreaker:
def __init__(
self,
failure_threshold: int = 5,
recovery_timeout: int = 60,
expected_exception: type = Exception
):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.expected_exception = expected_exception
self.failure_count = 0
self.last_failure_time = None
self.state = CircuitState.CLOSED
async def call(self, func):
if self.state == CircuitState.OPEN:
# Check if recovery timeout has passed
if datetime.now() - self.last_failure_time > timedelta(seconds=self.recovery_timeout):
self.state = CircuitState.HALF_OPEN
else:
raise CircuitBreakerOpen(f"Circuit open, retry after {self.recovery_timeout}s")
try:
result = await func()
# Success: Reset circuit
if self.state == CircuitState.HALF_OPEN:
self.state = CircuitState.CLOSED
self.failure_count = 0
return result
except self.expected_exception as e:
self.failure_count += 1
self.last_failure_time = datetime.now()
# Open circuit if threshold exceeded
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
print(f"Circuit breaker opened after {self.failure_count} failures")
raise
# Usage
openai_circuit = CircuitBreaker(
failure_threshold=5, # Open after 5 failures
recovery_timeout=60, # Wait 60s before retrying
expected_exception=APIError
)
async def call_openai_protected():
return await openai_circuit.call(
lambda: openai.ChatCompletion.create(...)
)
Real Example:
13:00: API call → Success (circuit: CLOSED)
13:01: API call → Success (circuit: CLOSED)
13:02: API call → Timeout (failure count: 1)
13:02: API call → Timeout (failure count: 2)
13:02: API call → Timeout (failure count: 3)
13:03: API call → Timeout (failure count: 4)
13:03: API call → Timeout (failure count: 5)
13:03: Circuit OPENS (stops trying)
13:03-13:04: All calls fail immediately with "Circuit open"
13:04: Circuit enters HALF_OPEN (tries one request)
13:04: API call → Success → Circuit CLOSES
13:04: All calls work normally again
Benefit: Prevents wasting time on doomed requests, allows service to recover.
When: Primary path fails, use alternative.
async def call_with_model_fallback(prompt, max_retries=2):
models = [
("gpt-4-turbo", 0.01), # Primary: Best quality
("gpt-3.5-turbo", 0.002), # Fallback 1: Cheaper
("claude-3-haiku", 0.001) # Fallback 2: Cheapest
]
for model_name, cost_per_token in models:
try:
response = await retry_with_backoff(
lambda: call_llm(model_name, prompt),
max_retries=max_retries
)
return response
except Exception as e:
print(f"{model_name} failed: {e}. Trying next model...")
continue
raise AllModelsFailed("All models failed")
async def call_with_cache_fallback(prompt):
cache_key = hash_prompt(prompt)
try:
# Try live API call
response = await call_llm(prompt)
# Cache successful response
cache.set(cache_key, response, ttl=3600)
return response
except Exception as e:
# API failed, check cache
cached_response = cache.get(cache_key)
if cached_response:
print(f"API failed, returning cached response from {cached_response['cached_at']}")
return cached_response
raise # No cache available, re-raise error
async def call_with_human_fallback(task, max_auto_retries=3):
try:
return await retry_with_backoff(
lambda: agent.execute(task),
max_retries=max_auto_retries
)
except Exception as e:
# Agent failed, escalate to human
ticket_id = create_support_ticket(
title=f"Agent failed: {task['type']}",
description=f"Error: {e}\nTask: {task}",
priority="high"
)
await notify_on_call_human(ticket_id)
return {
"status": "escalated_to_human",
"ticket_id": ticket_id,
"message": "An engineer has been notified and will handle this manually."
}
Problem: Agent waits forever for slow API response.
Solution: Set timeouts at multiple levels.
import asyncio
async def call_with_timeout(func, timeout_seconds=30):
try:
return await asyncio.wait_for(func(), timeout=timeout_seconds)
except asyncio.TimeoutError:
raise TimeoutError(f"Operation exceeded {timeout_seconds}s timeout")
# Multi-level timeouts
async def agent_workflow():
# Level 1: Individual LLM call (30s timeout)
llm_response = await call_with_timeout(
lambda: call_llm(prompt),
timeout_seconds=30
)
# Level 2: External API call (10s timeout)
api_data = await call_with_timeout(
lambda: fetch_external_api(),
timeout_seconds=10
)
# Level 3: Entire workflow (5 minute timeout)
return api_data
# Enforce workflow-level timeout
result = await call_with_timeout(
agent_workflow,
timeout_seconds=300
)
Timeout Values:
| Operation | Timeout | Rationale |
|---|---|---|
| LLM API call | 30-60s | OpenAI/Anthropic typically respond in 2-10s, but can spike to 30s |
| External API | 10s | Most APIs respond <1s, 10s is generous |
| Database query | 5s | Should be fast, >5s indicates problem |
| Entire workflow | 5-10min | Prevents infinite hangs |
When: Can't provide full functionality, provide partial functionality.
Example: E-commerce recommendation agent
Full functionality: Personalized recommendations based on user history + current trends + inventory
Degraded functionality:
Implementation:
async def get_recommendations(user_id):
recommendations = []
# Try personalized (best)
try:
user_history = await call_with_timeout(
lambda: fetch_user_history(user_id),
timeout_seconds=5
)
recommendations.extend(
await generate_personalized(user_history)
)
except Exception:
print("Personalization failed, degrading...")
# Try trending (good)
try:
trends = await call_with_timeout(
lambda: fetch_trending_items(),
timeout_seconds=5
)
recommendations.extend(trends[:10])
except Exception:
print("Trends failed, degrading further...")
# Fallback to bestsellers (okay)
if not recommendations:
try:
bestsellers = await fetch_bestsellers()
recommendations.extend(bestsellers[:10])
except Exception:
print("Bestsellers failed, using static fallback...")
# Last resort: Static curated list (minimal)
if not recommendations:
recommendations = STATIC_CURATED_LIST
return recommendations
User experience:
Better than: Complete failure with "Error 500" message.
from prometheus_client import Counter, Histogram
# Metrics
errors_total = Counter('agent_errors_total', 'Total errors', ['error_type', 'agent_name'])
retry_count = Counter('agent_retries_total', 'Total retries', ['agent_name'])
latency = Histogram('agent_latency_seconds', 'Request latency', ['agent_name'])
async def monitored_agent_call(agent_name, task):
start_time = time.time()
retry_attempts = 0
try:
result = await retry_with_backoff(
lambda: agent.execute(task),
max_retries=5
)
# Record success metrics
latency.labels(agent_name=agent_name).observe(time.time() - start_time)
return result
except Exception as e:
# Record error metrics
error_type = type(e).__name__
errors_total.labels(error_type=error_type, agent_name=agent_name).inc()
raise
finally:
retry_count.labels(agent_name=agent_name).inc(retry_attempts)
alerts:
- name: HighErrorRate
condition: error_rate > 0.05 # 5% error rate
duration: 5m
action: page_oncall_engineer
- name: CircuitBreakerOpen
condition: circuit_breaker_state == "open"
duration: 1m
action: send_slack_alert
- name: HighLatency
condition: p95_latency > 60s
duration: 10m
action: send_slack_alert
Before deploying agent to production:
How many retries should I configure?
Recommendation: 3-5 retries for most cases.
Exception: Critical operations (payments, data loss) may warrant 10+ retries.
Should I retry on all errors?
No. Only retry transient errors:
How long should circuit breaker stay open?
Standard: 60 seconds.
Tune based on monitoring: If circuit reopens frequently, increase timeout.
What's the performance cost of error handling?
Retry overhead: Adds latency only when failures occur (0% overhead in happy path).
Circuit breaker overhead: ~1ms per call (negligible).
Monitoring overhead: ~5-10ms per call (acceptable for production observability).
Bottom line: Production AI agents require robust error handling. Implement retry with exponential backoff, circuit breakers, timeouts, fallbacks, and graceful degradation. Proper error handling increases reliability from 87% to 99.2%. Monitor error rates and set alerts for anomalies.
Next: Read our Agent Observability guide for comprehensive monitoring strategies.