Error Handling in A2A Protocol¶
Learning Path: Communication
Difficulty: Intermediate
Prerequisites: Protocol Messages, Core Concepts
Completion Time: 45-60 minutes
Navigation¶
← Previous: Streaming & Events | Next: Message Schemas →
↑ Up: A2A Overview
🎯 What You'll Learn¶
This document covers robust error handling patterns for A2A implementations:
- Standard A2A error codes and meanings
- Error message structure and validation
- Graceful degradation strategies
- Retry logic and backoff patterns
- Circuit breaker implementation
- Error recovery workflows
- Security considerations in error handling
- Testing error scenarios
📚 Overview¶
Robust error handling is critical in distributed multi-agent systems. Unlike single-process applications, agent communication involves:
- Network failures - Connections drop, timeouts occur
- Agent unavailability - Services go down, agents restart
- Message validation failures - Malformed or invalid requests
- Authorization failures - Permission denied scenarios
- Resource exhaustion - Rate limits, capacity constraints
- Data inconsistencies - State synchronization issues
Good error handling means: - ✅ Never crash - Gracefully handle all error conditions - ✅ Informative - Provide actionable error messages - ✅ Secure - Don't leak sensitive information - ✅ Recoverable - Enable automatic or manual recovery - ✅ Observable - Log errors for monitoring and debugging
🏗️ A2A Error Message Structure¶
Standard ERROR Message Format¶
{
"message_id": "uuid-v4",
"message_type": "error",
"sender_id": "agent-id",
"recipient_id": "requesting-agent-id",
"timestamp": "ISO-8601-timestamp",
"payload": {
"error": {
"code": "string",
"message": "string",
"details": {},
"retry_after": 0,
"documentation_url": "string"
}
},
"correlation_id": "original-message-id"
}
Error Object Fields¶
| Field | Type | Required | Description |
|---|---|---|---|
code | string | ✅ | Machine-readable error code (UPPERCASE_UNDERSCORE) |
message | string | ✅ | Human-readable error description |
details | object | ❌ | Additional context (safe for external consumption) |
retry_after | integer | ❌ | Seconds to wait before retry (for rate limiting) |
documentation_url | string | ❌ | Link to error documentation |
📋 Standard A2A Error Codes¶
Client Error Codes (4xx)¶
| Code | HTTP Equiv | Description | Retry? |
|---|---|---|---|
INVALID_MESSAGE | 400 | Malformed message structure | ❌ No |
VALIDATION_FAILED | 400 | Message validation failed | ❌ No |
AUTHENTICATION_REQUIRED | 401 | No credentials provided | ❌ No |
AUTHENTICATION_FAILED | 401 | Invalid credentials | ❌ No |
FORBIDDEN | 403 | Authorized but not permitted | ❌ No |
NOT_FOUND | 404 | Agent or resource not found | ⚠️ Maybe |
METHOD_NOT_ALLOWED | 405 | Method not supported by agent | ❌ No |
CONFLICT | 409 | Request conflicts with current state | ⚠️ Maybe |
PAYLOAD_TOO_LARGE | 413 | Message exceeds size limit | ❌ No |
UNSUPPORTED_MEDIA_TYPE | 415 | Content type not supported | ❌ No |
UNPROCESSABLE_ENTITY | 422 | Semantically incorrect request | ❌ No |
RATE_LIMIT_EXCEEDED | 429 | Too many requests | ✅ Yes (with backoff) |
Server Error Codes (5xx)¶
| Code | HTTP Equiv | Description | Retry? |
|---|---|---|---|
INTERNAL_ERROR | 500 | Unspecified server error | ✅ Yes |
NOT_IMPLEMENTED | 501 | Feature not implemented | ❌ No |
BAD_GATEWAY | 502 | Invalid response from upstream | ✅ Yes |
SERVICE_UNAVAILABLE | 503 | Temporarily unavailable | ✅ Yes (with backoff) |
TIMEOUT | 504 | Request timeout | ✅ Yes |
VERSION_NOT_SUPPORTED | 505 | Protocol version unsupported | ❌ No |
A2A-Specific Error Codes¶
| Code | Description | Retry? |
|---|---|---|
AGENT_NOT_REGISTERED | Agent not in registry | ⚠️ Maybe |
CAPABILITY_NOT_AVAILABLE | Requested capability unavailable | ⚠️ Maybe |
SESSION_EXPIRED | Session or token expired | ❌ No (re-authenticate) |
SESSION_INVALID | Session ID invalid or hijacked | ❌ No |
NONCE_REUSED | Replay attack detected | ❌ No |
SIGNATURE_INVALID | Message signature verification failed | ❌ No |
AGENT_OVERLOADED | Agent at capacity | ✅ Yes (with backoff) |
DEPENDENCY_FAILED | Required downstream service unavailable | ✅ Yes |
STATE_CONFLICT | Agent state inconsistent | ⚠️ Maybe |
💡 Error Handling Examples¶
Example 1: Validation Error¶
Scenario: Client sends malformed message
{
"message_id": "e1f2g3h4-5i6j-7k8l-9m0n-1o2p3q4r5s6t",
"message_type": "error",
"sender_id": "crypto-agent-001",
"recipient_id": "client-agent-001",
"timestamp": "2025-12-09T15:30:00.000Z",
"payload": {
"error": {
"code": "VALIDATION_FAILED",
"message": "Required field 'method' missing from payload",
"details": {
"field": "payload.method",
"constraint": "required",
"received_payload": {
"parameters": {"currency": "BTC"}
}
}
}
},
"correlation_id": "a1b2c3d4-5e6f-7g8h-9i0j-1k2l3m4n5o6p"
}
Client Handling:
async def handle_validation_error(error_msg: dict):
"""Handle validation errors - don't retry, fix the request"""
error = error_msg["payload"]["error"]
# Log for debugging
logger.error(f"Validation failed: {error['message']}")
logger.debug(f"Details: {error['details']}")
# Don't retry - validation errors won't fix themselves
# Application logic should fix the request structure
raise ValueError(f"Invalid request: {error['message']}")
Example 2: Rate Limit Error¶
Scenario: Too many requests sent
{
"message_id": "f2g3h4i5-6j7k-8l9m-0n1o-2p3q4r5s6t7u",
"message_type": "error",
"sender_id": "crypto-agent-001",
"recipient_id": "client-agent-001",
"timestamp": "2025-12-09T15:31:00.000Z",
"payload": {
"error": {
"code": "RATE_LIMIT_EXCEEDED",
"message": "Request rate limit exceeded. Maximum 100 requests per minute.",
"details": {
"limit": 100,
"window_seconds": 60,
"current_count": 147,
"reset_at": "2025-12-09T15:32:00.000Z"
},
"retry_after": 60
}
},
"correlation_id": "b2c3d4e5-6f7g-8h9i-0j1k-2l3m4n5o6p7q"
}
Client Handling:
import asyncio
from datetime import datetime, timedelta
async def handle_rate_limit_error(error_msg: dict):
"""Handle rate limit with exponential backoff"""
error = error_msg["payload"]["error"]
retry_after = error.get("retry_after", 60)
logger.warning(f"Rate limited. Waiting {retry_after}s before retry")
# Honor retry_after with small buffer
await asyncio.sleep(retry_after + 1)
# After waiting, retry the original request
# Your retry logic here
Example 3: Service Unavailable Error¶
Scenario: Agent temporarily down
{
"message_id": "g3h4i5j6-7k8l-9m0n-1o2p-3q4r5s6t7u8v",
"message_type": "error",
"sender_id": "crypto-agent-001",
"recipient_id": "client-agent-001",
"timestamp": "2025-12-09T15:32:00.000Z",
"payload": {
"error": {
"code": "SERVICE_UNAVAILABLE",
"message": "Service temporarily unavailable due to maintenance",
"details": {
"reason": "scheduled_maintenance",
"estimated_duration_seconds": 300,
"status_url": "https://status.crypto-agent.example.com"
},
"retry_after": 300
}
},
"correlation_id": "c3d4e5f6-7g8h-9i0j-1k2l-3m4n5o6p7q8r"
}
Client Handling:
async def handle_service_unavailable(
error_msg: dict,
max_retries: int = 3
):
"""Handle service unavailable with retry logic"""
error = error_msg["payload"]["error"]
retry_after = error.get("retry_after", 60)
for attempt in range(max_retries):
logger.info(f"Service unavailable. Retry {attempt+1}/{max_retries}")
# Exponential backoff: retry_after * 2^attempt
wait_time = retry_after * (2 ** attempt)
await asyncio.sleep(wait_time)
# Attempt retry
try:
return await retry_request()
except ServiceUnavailableError:
if attempt == max_retries - 1:
raise
continue
Example 4: Authentication Error¶
Scenario: Invalid or expired credentials
{
"message_id": "h4i5j6k7-8l9m-0n1o-2p3q-4r5s6t7u8v9w",
"message_type": "error",
"sender_id": "crypto-agent-001",
"recipient_id": "client-agent-001",
"timestamp": "2025-12-09T15:33:00.000Z",
"payload": {
"error": {
"code": "AUTHENTICATION_FAILED",
"message": "Signature verification failed",
"details": {
"reason": "invalid_signature",
"expected_algorithm": "RS256",
"received_algorithm": "HS256"
},
"documentation_url": "https://docs.a2a.example.com/auth#signatures"
}
},
"correlation_id": "d4e5f6g7-8h9i-0j1k-2l3m-4n5o6p7q8r9s"
}
Client Handling:
async def handle_authentication_error(error_msg: dict):
"""Handle authentication failures"""
error = error_msg["payload"]["error"]
logger.error(f"Authentication failed: {error['message']}")
# Check if it's a signature issue
if "signature" in error["message"].lower():
# Regenerate signature with correct algorithm
await regenerate_credentials()
# Check if token expired
elif "expired" in error["message"].lower():
# Re-authenticate
await reauthenticate()
else:
# Unknown auth error - don't retry, requires manual fix
raise AuthenticationError(error["message"])
🔄 Retry Strategies¶
Exponential Backoff¶
When to use: Transient failures, rate limits, service unavailability
import asyncio
import random
async def exponential_backoff_retry(
func,
max_retries: int = 5,
base_delay: float = 1.0,
max_delay: float = 60.0,
jitter: bool = True
):
"""
Retry with exponential backoff
Args:
func: Async function to retry
max_retries: Maximum retry attempts
base_delay: Initial delay in seconds
max_delay: Maximum delay cap
jitter: Add randomness to prevent thundering herd
"""
for attempt in range(max_retries):
try:
return await func()
except RetryableError as e:
if attempt == max_retries - 1:
raise
# Calculate delay: base_delay * 2^attempt
delay = min(base_delay * (2 ** attempt), max_delay)
# Add jitter (randomness) to prevent thundering herd
if jitter:
delay = delay * (0.5 + random.random())
logger.info(f"Retry {attempt+1}/{max_retries} after {delay:.2f}s")
await asyncio.sleep(delay)
Usage:
# Retry a request with exponential backoff
result = await exponential_backoff_retry(
lambda: send_a2a_request(agent_id, message),
max_retries=5,
base_delay=1.0
)
Circuit Breaker Pattern¶
When to use: Prevent cascading failures, protect overloaded agents
from enum import Enum
from datetime import datetime, timedelta
class CircuitState(Enum):
CLOSED = "closed" # Normal operation
OPEN = "open" # Failures detected, blocking requests
HALF_OPEN = "half_open" # Testing if service recovered
class CircuitBreaker:
"""
Circuit breaker for agent communication
States:
- CLOSED: Normal operation, requests pass through
- OPEN: Too many failures, blocking all requests
- HALF_OPEN: Testing recovery, allowing limited requests
"""
def __init__(
self,
failure_threshold: int = 5,
recovery_timeout: float = 60.0,
success_threshold: int = 2
):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.success_threshold = success_threshold
self.state = CircuitState.CLOSED
self.failure_count = 0
self.success_count = 0
self.last_failure_time = None
async def call(self, func):
"""Execute function through circuit breaker"""
# If OPEN, check if recovery timeout passed
if self.state == CircuitState.OPEN:
if self._should_attempt_reset():
self.state = CircuitState.HALF_OPEN
self.success_count = 0
else:
raise CircuitBreakerOpenError(
f"Circuit breaker OPEN. Try again after "
f"{self.recovery_timeout}s"
)
try:
result = await func()
self._on_success()
return result
except Exception as e:
self._on_failure()
raise
def _on_success(self):
"""Handle successful request"""
if self.state == CircuitState.HALF_OPEN:
self.success_count += 1
# Enough successes? Close circuit
if self.success_count >= self.success_threshold:
self.state = CircuitState.CLOSED
self.failure_count = 0
logger.info("Circuit breaker CLOSED (recovered)")
elif self.state == CircuitState.CLOSED:
# Reset failure count on success
self.failure_count = 0
def _on_failure(self):
"""Handle failed request"""
self.failure_count += 1
self.last_failure_time = datetime.utcnow()
if self.state == CircuitState.HALF_OPEN:
# Failure during recovery - back to OPEN
self.state = CircuitState.OPEN
logger.warning("Circuit breaker re-OPENED (recovery failed)")
elif self.failure_count >= self.failure_threshold:
# Too many failures - open circuit
self.state = CircuitState.OPEN
logger.error("Circuit breaker OPENED (too many failures)")
def _should_attempt_reset(self) -> bool:
"""Check if enough time passed to attempt reset"""
if not self.last_failure_time:
return True
elapsed = (datetime.utcnow() - self.last_failure_time).total_seconds()
return elapsed >= self.recovery_timeout
# Usage
breaker = CircuitBreaker(
failure_threshold=5,
recovery_timeout=60.0
)
try:
result = await breaker.call(
lambda: send_a2a_request(agent_id, message)
)
except CircuitBreakerOpenError:
logger.error("Agent unavailable - circuit breaker open")
# Use fallback or cached data
Retry Decision Tree¶
┌─────────────────────────────────────┐
│ Error Occurred │
└─────────────────┬───────────────────┘
│
▼
┌─────────────────────────────────────┐
│ What type of error? │
└─────────────────┬───────────────────┘
│
┌────────┴────────┐
│ │
▼ ▼
┌─────────────────┐ ┌─────────────────┐
│ 4xx Client │ │ 5xx Server │
│ Error │ │ Error │
└────────┬────────┘ └────────┬────────┘
│ │
│ ▼
│ ┌─────────────────┐
│ │ Retryable? │
│ └────────┬────────┘
│ │
│ ┌────────┴────────┐
│ │ │
│ ▼ ▼
│ ┌──────────────┐ ┌──────────────┐
│ │ 503, 429, │ │ 500 │
│ │ 504, TIMEOUT │ │ (may be bug) │
│ └──────┬───────┘ └──────┬───────┘
│ │ │
│ ▼ ▼
│ ┌──────────────┐ ┌──────────────┐
│ │ ✅ Retry │ │ ⚠️ Retry │
│ │ with backoff │ │ limited │
│ └──────────────┘ └──────────────┘
│
▼
┌─────────────────┐
│ Check specific │
│ error code │
└────────┬────────┘
│
┌────┴──────────────────┐
│ │
▼ ▼
┌──────────┐ ┌──────────────┐
│ 400, 401,│ │ 404, 409 │
│ 403, 422 │ │ │
└─────┬────┘ └─────┬────────┘
│ │
▼ ▼
┌──────────┐ ┌──────────────┐
│ ❌ DON'T │ │ ⚠️ Maybe │
│ Retry │ │ retry once │
└──────────┘ └──────────────┘
🛡️ Security Considerations in Error Handling¶
Rule 1: Never Leak Sensitive Information¶
❌ Bad Example (Stage 1 - Vulnerable):
# DON'T DO THIS
try:
user = authenticate(username, password)
except Exception as e:
return {
"error": str(e), # Might leak: "User 'admin' not found in database"
"stack_trace": traceback.format_exc(), # Exposes code structure
"database": "postgresql://prod-db-1.internal:5432/users" # Leaks infrastructure
}
✅ Good Example (Stage 3 - Secure):
# DO THIS
try:
user = authenticate(username, password)
except AuthenticationError as e:
# Generic message to client
return {
"error": {
"code": "AUTHENTICATION_FAILED",
"message": "Authentication failed" # No details about why
}
}
# Detailed logging server-side only
logger.error(f"Auth failed: {username} - {str(e)}")
Rule 2: Different Messages for Internal vs External¶
class ErrorResponse:
"""Dual error messages: detailed for logs, safe for clients"""
def __init__(self, code: str, internal_msg: str, client_msg: str):
self.code = code
self.internal_message = internal_msg
self.client_message = client_msg
def to_client(self) -> dict:
"""Safe message for external consumption"""
return {
"error": {
"code": self.code,
"message": self.client_message
}
}
def to_log(self) -> str:
"""Detailed message for internal logs"""
return f"[{self.code}] {self.internal_message}"
# Usage
try:
process_file(filepath)
except FileNotFoundError as e:
error = ErrorResponse(
code="FILE_NOT_FOUND",
internal_msg=f"File not found: {filepath} (user: {user_id})",
client_msg="Requested file not found"
)
logger.error(error.to_log()) # Detailed logging
return error.to_client() # Safe client response
Rule 3: Rate Limit Error Responses¶
Even error responses can be abused:
from collections import defaultdict
from datetime import datetime, timedelta
class ErrorRateLimiter:
"""Rate limit error responses to prevent information leakage"""
def __init__(self, max_errors_per_minute: int = 10):
self.max_errors = max_errors_per_minute
self.error_counts = defaultdict(list) # agent_id -> [timestamps]
def should_send_error(self, agent_id: str) -> bool:
"""Check if we should send detailed error"""
now = datetime.utcnow()
cutoff = now - timedelta(minutes=1)
# Clean old timestamps
self.error_counts[agent_id] = [
ts for ts in self.error_counts[agent_id]
if ts > cutoff
]
# Check limit
if len(self.error_counts[agent_id]) >= self.max_errors:
return False
self.error_counts[agent_id].append(now)
return True
# Usage
error_limiter = ErrorRateLimiter(max_errors_per_minute=10)
if not error_limiter.should_send_error(agent_id):
# Too many errors - send generic response
return {
"error": {
"code": "RATE_LIMIT_EXCEEDED",
"message": "Too many errors. Please contact support."
}
}
else:
# Send specific error
return detailed_error_response
🧪 Testing Error Scenarios¶
Unit Tests for Error Handling¶
import pytest
@pytest.mark.asyncio
async def test_validation_error_handling():
"""Test handling of validation errors"""
# Send invalid message
message = {
"message_id": str(uuid.uuid4()),
"message_type": "request",
# Missing required fields...
}
response = await send_message(message)
# Should get validation error
assert response["message_type"] == "error"
assert response["payload"]["error"]["code"] == "VALIDATION_FAILED"
assert "correlation_id" in response
@pytest.mark.asyncio
async def test_rate_limit_retry():
"""Test retry logic for rate limits"""
# Send many requests to trigger rate limit
for i in range(150):
response = await send_request()
# Should get rate limit error
assert response["payload"]["error"]["code"] == "RATE_LIMIT_EXCEEDED"
assert "retry_after" in response["payload"]["error"]
# Wait and retry should succeed
await asyncio.sleep(response["payload"]["error"]["retry_after"])
retry_response = await send_request()
assert retry_response["message_type"] != "error"
@pytest.mark.asyncio
async def test_circuit_breaker_opens():
"""Test circuit breaker opens after failures"""
breaker = CircuitBreaker(failure_threshold=3)
# Simulate failures
for i in range(3):
with pytest.raises(Exception):
await breaker.call(lambda: failing_request())
# Circuit should be open
assert breaker.state == CircuitState.OPEN
# Next call should fail immediately
with pytest.raises(CircuitBreakerOpenError):
await breaker.call(lambda: working_request())
@pytest.mark.asyncio
async def test_no_sensitive_data_in_errors():
"""Ensure errors don't leak sensitive data"""
# Trigger various errors
errors = [
await trigger_auth_error(),
await trigger_validation_error(),
await trigger_internal_error()
]
# Check none contain sensitive data
sensitive_patterns = [
r'\b\d{3}-\d{2}-\d{4}\b', # SSN
r'password',
r'secret',
r'token',
r'/var/', # File paths
r'postgresql://' # Connection strings
]
for error in errors:
error_str = json.dumps(error)
for pattern in sensitive_patterns:
assert not re.search(pattern, error_str, re.IGNORECASE)
📊 Error Handling Metrics¶
What to Monitor¶
from prometheus_client import Counter, Histogram
# Error counters by type
error_counter = Counter(
'a2a_errors_total',
'Total A2A errors',
['agent_id', 'error_code', 'error_type']
)
# Retry attempts
retry_counter = Counter(
'a2a_retries_total',
'Total retry attempts',
['agent_id', 'attempt_number']
)
# Circuit breaker state
circuit_breaker_state = Gauge(
'a2a_circuit_breaker_state',
'Circuit breaker state (0=closed, 1=open, 2=half_open)',
['agent_id']
)
# Error response time
error_response_time = Histogram(
'a2a_error_response_seconds',
'Time to generate error response',
['error_code']
)
# Usage in code
with error_response_time.labels(error_code='RATE_LIMIT_EXCEEDED').time():
error_response = generate_error_response(...)
error_counter.labels(
agent_id=agent_id,
error_code='RATE_LIMIT_EXCEEDED',
error_type='client'
).inc()
🎓 Best Practices Summary¶
✅ DO¶
- Use standard error codes - Consistent across all agents
- Include correlation_id - Trace errors back to original request
- Provide retry_after - Tell clients when to retry
- Log detailed errors - But only server-side
- Implement exponential backoff - Prevent overwhelming failed services
- Use circuit breakers - Protect against cascading failures
- Test error paths - Don't just test happy paths
- Monitor error rates - Set up alerts for unusual patterns
- Document error codes - Help client developers understand errors
- Fail gracefully - Never crash, always return proper error message
❌ DON'T¶
- Don't leak sensitive data - No PII, credentials, or system details in errors
- Don't expose stack traces - Keep internal details internal
- Don't retry non-retryable errors - 400-level errors won't fix themselves
- Don't retry forever - Set max retry limits
- Don't ignore error details - Use them to improve your code
- Don't use generic error codes - Be specific about what went wrong
- Don't forget correlation IDs - Always trace errors to requests
- Don't overwhelm with errors - Rate limit error responses too
- Don't hide errors from monitoring - Log and track all errors
- Don't assume retry will succeed - Have fallback strategies
🔗 Related Documentation¶
- Protocol Messages - Message structure
- Message Schemas - JSON schemas
- Security Best Practices - Security considerations
- Threat Model - Attack scenarios
📚 Further Reading¶
External Resources¶
- Exponential Backoff and Jitter
- Circuit Breaker Pattern
- HTTP Status Codes
- REST API Error Handling Best Practices
Project Examples¶
Study error handling in the example implementations:
Example 1: Crypto Agent (Stage 1) - ❌ Poor error handling - Generic error messages - No retry logic - Exposes stack traces
Example 2: Crypto Agent (Stage 2) - ⚠️ Improved but incomplete - Basic error codes - Simple retry logic - Still some information leakage
Example 3: Crypto Agent (Stage 3) - ✅ Production-ready - Complete error taxonomy - Exponential backoff retry - Circuit breaker implementation - Security-conscious error messages - Comprehensive error monitoring
Document Version: 1.0
Last Updated: December 2025
Status: Complete
Author: Based on A2A Protocol Best Practices