Security Evolution: Code Walkthrough Comparison¶
Learning Path: Security
Difficulty: Intermediate to Advanced
Prerequisites: Core Concepts, Authentication Tags
Navigation¶
← Previous: Security Best Practices | Next: Threat Model →
↑ Up: A2A Overview
🎯 What You'll Learn¶
This document provides a side-by-side comparison of three progressive implementations:
- How to identify security vulnerabilities in code
- What changes between vulnerable and secure implementations
- Why each security control matters
- How to incrementally improve security
- Common security anti-patterns to avoid
📚 The Three Examples¶
Example 1: Basic Implementation (Vulnerable) ❌¶
Location: examples/a2a_crypto_example/
Status: Intentionally insecure for educational purposes
Purpose: Learn to recognize vulnerabilities
Example 2: Registry Integration (Improved) ⚠️¶
Location: examples/a2a_crypto_simple_registry_example_1/
Status: Some security, but incomplete
Purpose: Understand incremental improvements
Example 3: Security Module (Secure) ✅¶
Location: examples/a2a_crypto_example/security/
Status: Production-ready security architecture
Purpose: Reference implementation
🔍 Security Control Progression Matrix¶
| Security Control | Example 1 | Example 2 | Example 3 | Impact |
|---|---|---|---|---|
| Input Validation | ❌ None | ⚠️ Basic | ✅ Comprehensive | CRITICAL |
| Authentication | ❌ None | ⚠️ Simple Signature | ✅ Full PKI | CRITICAL |
| Authorization | ❌ None | ⚠️ Capability Check | ✅ RBAC | HIGH |
| Replay Protection | ❌ None | ❌ None | ✅ Nonce-based | HIGH |
| Rate Limiting | ❌ None | ❌ None | ✅ Token Bucket | MEDIUM |
| Audit Logging | ❌ None | ⚠️ Console Print | ✅ Structured Logs | MEDIUM |
| Encryption | ❌ Plaintext | ⚠️ Transport Only | ✅ End-to-end | HIGH |
| Error Handling | ❌ Exposes Info | ⚠️ Generic | ✅ Safe Messages | MEDIUM |
| Injection Prevention | ❌ None | ⚠️ Basic | ✅ Multi-layer | CRITICAL |
| Session Management | ❌ None | ⚠️ Basic | ✅ Secure Tokens | HIGH |
🎓 How to Use This Guide¶
For Learners¶
- Read Example 1 code - Try to spot vulnerabilities yourself
- Check this guide - See what you missed
- Read Example 2 code - Notice the improvements
- Study Example 3 code - See production-ready patterns
- Practice - Apply to your own code
For Security Auditors¶
- Start with Example 1 - Perform a mock security audit
- Document findings - List all vulnerabilities
- Compare with this guide - Validate your findings
- Review Example 3 - Understand proper mitigations
For Code Reviewers¶
- Use as a checklist - Compare against your codebase
- Identify patterns - Recognize similar issues
- Recommend fixes - Reference Example 3 patterns
🔐 Security Control Deep Dives¶
1. Input Validation¶
❌ Example 1: No Validation (DANGEROUS!)¶
# From a2a_crypto_example/server/crypto_agent_server.py
async def handle_message(self, message: A2AMessage, client_id: str):
"""Handle incoming A2A message"""
if message.message_type == MessageType.REQUEST:
# SECURITY FLAW: No validation of message payload!
method = message.payload.get("method")
params = message.payload.get("params", {})
if method == "get_price":
# SECURITY FLAW: No validation of currency parameter!
currency = params.get("currency") # Could be ANYTHING!
price = self.get_price(currency) # Injection risk!
return A2AProtocol.create_response(
self.agent_id,
message.sender_id,
{"price": price, "currency": currency}
)
Vulnerabilities: 1. ❌ No schema validation - malformed messages accepted 2. ❌ No type checking - currency could be any type 3. ❌ No length limits - DOS via huge messages 4. ❌ No sanitization - injection attacks possible 5. ❌ No whitelist - any method name accepted
Attack Scenarios:
# Attacker sends malicious payload
{
"method": "get_price",
"params": {
"currency": "'; DROP TABLE agents;--" # SQL injection attempt
}
}
# OR
{
"method": "__import__('os').system('rm -rf /')", # Code injection
"params": {}
}
# OR
{
"method": "get_price",
"params": {
"currency": "A" * 10000000 # DOS via memory exhaustion
}
}
⚠️ Example 2: Basic Validation (BETTER, BUT INCOMPLETE)¶
# From a2a_crypto_simple_registry_example_1/shared/a2a_protocol.py
@dataclass
class A2AMessage:
message_id: str
message_type: MessageType
sender_id: str
recipient_id: str
timestamp: str
payload: Dict[str, Any]
correlation_id: Optional[str] = None
def validate(self) -> bool:
"""Basic validation"""
# IMPROVEMENT: Some validation added
if not self.message_id or len(self.message_id) > 100:
return False
if not self.sender_id or len(self.sender_id) > 100:
return False
# STILL VULNERABLE: No payload validation!
# STILL VULNERABLE: No type checking!
# STILL VULNERABLE: No injection detection!
return True
Improvements: 1. ✅ Length limits on IDs 2. ✅ Required field checking 3. ✅ Basic dataclass structure
Remaining Vulnerabilities: 1. ❌ Payload contents not validated 2. ❌ No injection pattern detection 3. ❌ No whitelist enforcement 4. ❌ No nested object validation
✅ Example 3: Comprehensive Validation (SECURE!)¶
# From a2a_crypto_example/security/validator.py
class AgentCardValidator:
"""Comprehensive validation with multiple security layers"""
def validate_card(self, card: SecureAgentCard) -> Tuple[bool, List[str]]:
"""Multi-layer validation"""
issues = []
# LAYER 1: Schema validation
if not self._validate_schema(card):
issues.append("Invalid schema")
# LAYER 2: Type validation
if not isinstance(card.capabilities, dict):
issues.append("Capabilities must be dictionary")
# LAYER 3: Length limits
if len(card.name) > 100:
issues.append("Name too long")
# LAYER 4: Injection detection
metadata_issues = self._validate_metadata(card)
if metadata_issues:
issues.extend(metadata_issues)
# LAYER 5: Whitelist enforcement
invalid_caps = self._validate_capabilities(card)
if invalid_caps:
issues.append(f"Invalid capabilities: {invalid_caps}")
# LAYER 6: Rate limiting
if not self._check_validation_rate_limit(card.agent_id):
issues.append("Rate limit exceeded")
return len(issues) == 0, issues
def _validate_metadata(self, card: SecureAgentCard) -> List[str]:
"""Detect injection attempts in metadata"""
issues = []
for key, value in card.metadata.items():
# Check for suspicious keys
if any(pattern in key.lower() for pattern in SUSPICIOUS_KEY_PATTERNS):
issues.append(f"Suspicious key: {key}")
# Check for injection patterns in values
if isinstance(value, str):
if self._contains_injection_attempt(value):
issues.append(f"Potential injection in '{key}'")
return issues
def _contains_injection_attempt(self, value: str) -> bool:
"""Pattern-based injection detection"""
value_lower = value.lower()
injection_patterns = [
"script>", # XSS
"javascript:", # XSS
"onerror=", # XSS
"'; drop table", # SQL injection
"or 1=1", # SQL injection
"../", # Path traversal
"cmd.exe", # Command injection
"__import__", # Python code injection
"eval(", # Code execution
]
return any(pattern in value_lower for pattern in injection_patterns)
Security Features: 1. ✅ Multi-layer validation 2. ✅ Comprehensive type checking 3. ✅ Injection pattern detection 4. ✅ Whitelist enforcement 5. ✅ Rate limiting protection 6. ✅ Detailed error reporting
Defense in Depth: Even if one layer fails, others provide protection.
2. Authentication & Signatures¶
❌ Example 1: No Authentication (CRITICAL FLAW!)¶
# From a2a_crypto_example/server/crypto_agent_server.py
async def handle_message(self, message: A2AMessage, client_id: str):
"""Handle incoming message"""
# SECURITY FLAW: No verification of sender identity!
# Anyone can claim to be any agent!
if message.message_type == MessageType.HANDSHAKE:
# SECURITY FLAW: Agent card accepted without verification!
agent_data = message.payload.get("agent_card")
self.clients[client_id]["agent_id"] = agent_data["agent_id"]
# Attacker can impersonate any agent!
return self.create_handshake_ack()
Attack Scenario:
# Attacker impersonates admin agent
fake_admin_card = {
"agent_id": "admin-001", # Claim to be admin
"name": "AdminAgent",
"capabilities": ["admin", "delete_all"] # Claim admin rights
}
# Server accepts without verification!
# Attacker now has admin access!
⚠️ Example 2: Basic Signature (IMPROVED, BUT WEAK)¶
# From a2a_crypto_simple_registry_example_1/security/validator.py
def _verify_signature(self, card: SecureAgentCard) -> bool:
"""Basic signature verification (SIMPLIFIED)"""
if not card.signature:
return False
# IMPROVEMENT: Signature required
card_data = json.dumps({
"agent_id": card.agent_id,
"name": card.name,
"version": card.version,
"capabilities": card.capabilities
}, sort_keys=True)
# VULNERABILITY: Simplified crypto (not production-ready!)
expected_signature = hashlib.sha256(
f"{card_data}{card.public_key}".encode()
).hexdigest()
# IMPROVEMENT: Constant-time comparison
return hmac.compare_digest(card.signature[:64], expected_signature)
Improvements: 1. ✅ Signature required 2. ✅ Constant-time comparison (timing attack prevention) 3. ✅ Canonical JSON representation
Remaining Issues: 1. ❌ Simplified crypto (not using proper algorithms) 2. ❌ No certificate chain validation 3. ❌ No key rotation support 4. ❌ No revocation checking
✅ Example 3: Full PKI (PRODUCTION-READY!)¶
# From a2a_crypto_example/security/secure_agent_card.py
@dataclass
class SecureAgentCard:
"""Secure Agent Card with full PKI support"""
agent_id: str
name: str
version: str
capabilities: Dict[str, List[str]]
metadata: Dict[str, Any]
# PKI Fields
public_key: str # RSA/ECC public key
signature: str # Cryptographic signature
certificate_fingerprint: str # X.509 cert fingerprint
issuer: str # Certificate authority
issued_at: datetime # Issuance timestamp
expires_at: datetime # Expiration timestamp
signature_algorithm: str = "RS256" # Algorithm specification
trust_level: float = 0.0 # Reputation score
def sign(self, private_key: str):
"""Sign card with private key"""
# 1. Create canonical representation
card_data = self._to_canonical_dict()
canonical_json = json.dumps(card_data, sort_keys=True)
# 2. Sign with proper cryptographic algorithm
# (In production, use cryptography library)
signature_bytes = rsa_sign(
canonical_json.encode(),
private_key,
algorithm=self.signature_algorithm
)
# 3. Encode signature
self.signature = base64.b64encode(signature_bytes).decode()
# 4. Generate certificate fingerprint
self.certificate_fingerprint = hashlib.sha256(
public_key.encode()
).hexdigest()
def verify(self, public_key: str) -> bool:
"""Verify signature with public key"""
# 1. Reconstruct canonical data
card_data = self._to_canonical_dict()
canonical_json = json.dumps(card_data, sort_keys=True)
# 2. Verify with proper crypto
signature_bytes = base64.b64decode(self.signature)
return rsa_verify(
canonical_json.encode(),
signature_bytes,
public_key,
algorithm=self.signature_algorithm
)
Security Features: 1. ✅ Real cryptographic signatures (RSA/ECC) 2. ✅ Certificate fingerprinting 3. ✅ Issuer tracking 4. ✅ Expiration enforcement 5. ✅ Algorithm agility 6. ✅ Trust scoring 7. ✅ Canonical representation
3. Replay Attack Protection¶
❌ Example 1: No Protection (VULNERABLE!)¶
# From a2a_crypto_example
async def handle_message(self, message: A2AMessage):
"""Process message"""
# SECURITY FLAW: No replay protection!
# Attacker can record and replay messages!
if message.message_type == MessageType.REQUEST:
# Process request - same message can be replayed infinite times
result = await self.process_request(message)
return result
Attack Scenario:
# 1. Attacker intercepts legitimate message
legitimate_message = {
"message_id": "msg-123",
"message_type": "REQUEST",
"sender_id": "legitimate-agent",
"payload": {
"method": "transfer_funds",
"params": {"amount": 1000, "to": "attacker-account"}
}
}
# 2. Attacker replays message 100 times
# Result: $100,000 transferred instead of $1,000!
for i in range(100):
send_message(legitimate_message) # Same message, replayed
❌ Example 2: Still No Protection!¶
Example 2 also lacks replay protection - a significant oversight!
✅ Example 3: Nonce-Based Protection (SECURE!)¶
# From a2a_crypto_example/security/manager.py
class SecureAgentCardManager:
"""Manages secure agent cards with replay protection"""
def __init__(self, local_agent_id: str):
self.local_agent_id = local_agent_id
# Replay protection: Track used nonces
self.used_nonces: Dict[str, datetime] = {}
self.nonce_expiry = timedelta(minutes=5)
def generate_nonce(self) -> str:
"""Generate cryptographically secure nonce"""
# 32 bytes = 256 bits of entropy
return secrets.token_urlsafe(32)
def validate_nonce(self, nonce: str) -> Tuple[bool, str]:
"""Validate nonce hasn't been used"""
# Check if nonce was previously used
if nonce in self.used_nonces:
return False, "Nonce already used (replay attack detected)"
# Mark nonce as used
self.used_nonces[nonce] = datetime.now()
# Cleanup expired nonces
self._cleanup_expired_nonces()
return True, "Nonce valid"
def _cleanup_expired_nonces(self):
"""Remove expired nonces to prevent memory growth"""
current_time = datetime.now()
expired = [
nonce for nonce, timestamp in self.used_nonces.items()
if current_time - timestamp > self.nonce_expiry
]
for nonce in expired:
del self.used_nonces[nonce]
def exchange_cards(
self,
local_card: SecureAgentCard,
remote_card_data: Dict,
nonce: str
) -> Tuple[bool, Optional[SecureAgentCard], str]:
"""Exchange cards with replay protection"""
# STEP 1: Validate nonce
nonce_valid, nonce_message = self.validate_nonce(nonce)
if not nonce_valid:
self.audit_logger.log_replay_attempt(
remote_card_data.get("agent_id"),
nonce
)
return False, None, nonce_message
# STEP 2: Validate remote card
# ... rest of validation ...
return True, remote_card, "Exchange successful"
Security Features: 1. ✅ Cryptographically secure nonce generation 2. ✅ Nonce uniqueness enforcement 3. ✅ Time-based nonce expiration 4. ✅ Automatic cleanup (prevents memory leaks) 5. ✅ Replay attempt logging 6. ✅ 256-bit entropy
How It Works:
Request 1 (nonce: "abc123"):
✓ Nonce not seen before → Accept
✓ Add "abc123" to used_nonces
Request 2 (nonce: "abc123"): [REPLAY ATTACK]
✗ Nonce "abc123" already used → REJECT
✓ Log replay attempt
✓ Alert security team
Request 3 (nonce: "xyz789"):
✓ Nonce not seen before → Accept
✓ Add "xyz789" to used_nonces
4. Rate Limiting¶
❌ Example 1 & 2: No Rate Limiting (DOS VULNERABLE!)¶
# Both examples lack rate limiting
async def handle_message(self, message):
"""Process message"""
# SECURITY FLAW: No rate limiting!
# Attacker can send unlimited requests!
return await self.process(message)
Attack Scenario:
# Attacker floods server with requests
for i in range(1000000):
send_request({"method": "get_price", "params": {"currency": "BTC"}})
# Result: Server crashes from resource exhaustion
✅ Example 3: Token Bucket Rate Limiting (PROTECTED!)¶
# From a2a_crypto_example/security/validator.py
class AgentCardValidator:
"""Validator with rate limiting"""
def __init__(self):
# Rate limiting tracking
self.validation_attempts: Dict[str, List[float]] = {}
self.max_attempts_per_minute = 10
def _check_validation_rate_limit(self, agent_id: str) -> bool:
"""Token bucket rate limiting"""
current_time = time.time()
window_start = current_time - 60 # 1 minute window
# Initialize if first request
if agent_id not in self.validation_attempts:
self.validation_attempts[agent_id] = []
# Remove attempts outside the window
self.validation_attempts[agent_id] = [
timestamp for timestamp in self.validation_attempts[agent_id]
if timestamp > window_start
]
# Check if at limit
if len(self.validation_attempts[agent_id]) >= self.max_attempts_per_minute:
# RATE LIMIT EXCEEDED
return False
# Record this attempt
self.validation_attempts[agent_id].append(current_time)
return True
Security Features: 1. ✅ Per-agent rate limiting 2. ✅ Sliding window algorithm 3. ✅ Configurable thresholds 4. ✅ Automatic cleanup 5. ✅ Fair resource allocation
How It Works:
Agent A:
Request 1-10: ✓ Accepted (within limit)
Request 11: ✗ RATE LIMITED
[Wait 60s]
Request 12: ✓ Accepted (window reset)
Agent B:
Request 1-10: ✓ Accepted (separate quota)
5. Audit Logging¶
❌ Example 1: No Logging (NO VISIBILITY!)¶
⚠️ Example 2: Console Logging (INADEQUATE!)¶
# Basic console logging
print(f"📨 Received: {message.message_type} from {message.sender_id}")
print(f"✅ Handshake complete with: {agent.name}")
# PROBLEMS:
# - Not structured
# - Not persistent
# - No severity levels
# - No searchability
# - No alerting
✅ Example 3: Structured Audit Logging (COMPREHENSIVE!)¶
# From a2a_crypto_example/security/audit_logger.py
class SecurityAuditLogger:
"""Comprehensive security event logging"""
def __init__(self, component_name: str):
self.component_name = component_name
self.events: List[SecurityEvent] = []
self.agent_history: Dict[str, List[SecurityEvent]] = {}
def log_card_exchange(
self,
event_type: str,
local_agent_id: str,
remote_agent_id: str,
success: bool,
details: Optional[str] = None
):
"""Log card exchange with full context"""
event = SecurityEvent(
event_id=str(uuid.uuid4()),
timestamp=datetime.now(),
event_type=SecurityEventType.CARD_EXCHANGE,
severity=SecuritySeverity.INFO if success else SecuritySeverity.WARNING,
agent_id=remote_agent_id,
description=f"{event_type}: {local_agent_id} ↔ {remote_agent_id}",
details={
"local_agent": local_agent_id,
"remote_agent": remote_agent_id,
"exchange_type": event_type,
"success": success,
"additional_info": details
},
component=self.component_name
)
self._store_event(event)
def log_validation_failure(
self,
agent_id: str,
reasons: List[str],
card_data: Optional[Dict] = None
):
"""Log validation failure with reasons"""
event = SecurityEvent(
event_id=str(uuid.uuid4()),
timestamp=datetime.now(),
event_type=SecurityEventType.VALIDATION_FAILURE,
severity=SecuritySeverity.WARNING,
agent_id=agent_id,
description=f"Validation failed: {', '.join(reasons)}",
details={
"failure_reasons": reasons,
"card_summary": self._sanitize_card_data(card_data)
},
component=self.component_name
)
self._store_event(event)
def log_suspicious_activity(
self,
agent_id: str,
activity_type: str,
description: str
):
"""Log suspicious activity for security analysis"""
event = SecurityEvent(
event_id=str(uuid.uuid4()),
timestamp=datetime.now(),
event_type=SecurityEventType.SUSPICIOUS_ACTIVITY,
severity=SecuritySeverity.HIGH,
agent_id=agent_id,
description=f"Suspicious: {activity_type} - {description}",
details={
"activity_type": activity_type,
"detailed_description": description
},
component=self.component_name
)
self._store_event(event)
self._trigger_alert(event) # Immediate notification!
def log_replay_attempt(self, agent_id: str, nonce: str):
"""Log replay attack attempt"""
event = SecurityEvent(
event_id=str(uuid.uuid4()),
timestamp=datetime.now(),
event_type=SecurityEventType.REPLAY_ATTEMPT,
severity=SecuritySeverity.CRITICAL, # High priority!
agent_id=agent_id,
description=f"Replay attack detected from {agent_id}",
details={"nonce": nonce},
component=self.component_name
)
self._store_event(event)
self._trigger_alert(event) # CRITICAL: Alert immediately!
def export_events(self, filepath: str, format: str = "json"):
"""Export audit log for compliance/analysis"""
if format == "json":
with open(filepath, 'w') as f:
json.dump([event.to_dict() for event in self.events], f, indent=2)
elif format == "csv":
# CSV export for spreadsheet analysis
pass
def get_statistics(self) -> Dict[str, Any]:
"""Get security statistics for monitoring"""
return {
"total_events": len(self.events),
"by_severity": self._count_by_severity(),
"by_type": self._count_by_type(),
"top_agents": self._get_top_agents(10),
"recent_alerts": self._get_recent_alerts(50)
}
Logging Features: 1. ✅ Structured events (searchable, queryable) 2. ✅ Severity levels (INFO, WARNING, HIGH, CRITICAL) 3. ✅ Per-agent history tracking 4. ✅ Export capabilities (JSON, CSV) 5. ✅ Statistics and monitoring 6. ✅ Alert triggering for critical events 7. ✅ Compliance-ready format
📊 Attack Scenario Comparison¶
Let's see how each version handles a SQL injection attack:
Attack: SQL Injection via Currency Parameter¶
malicious_request = {
"method": "get_price",
"params": {
"currency": "BTC'; DROP TABLE agents;--"
}
}
❌ Example 1 Response:¶
✗ Accepts malicious input
✗ No validation
✗ Passes directly to processing
✗ Could execute SQL if database involved
✗ No logging of attack
✗ No alert generated
RESULT: SYSTEM COMPROMISED
⚠️ Example 2 Response:¶
⚠ Accepts input (no payload validation)
✓ Some field length checking
✗ No injection detection
⚠ Console log: "Received request"
✗ No alert generated
RESULT: STILL VULNERABLE
✅ Example 3 Response:¶
✓ Input validation triggered
✓ Injection pattern detected: "'; DROP TABLE"
✓ Request REJECTED before processing
✓ Structured log: SecurityEvent(SUSPICIOUS_ACTIVITY, severity=HIGH)
✓ Alert triggered to security team
✓ Agent reputation score decreased
✓ Repeat attempts trigger auto-block
RESULT: ATTACK PREVENTED AND LOGGED
🎯 Practical Exercises¶
Exercise 1: Spot the Vulnerabilities¶
Look at this code snippet and identify security issues:
async def handle_admin_request(self, message):
agent_id = message.sender_id
command = message.payload["command"]
if command == "delete_agent":
target = message.payload["target_id"]
self.delete_agent(target)
return {"status": "deleted"}
Click to see vulnerabilities
1. ❌ No authentication - anyone can claim to be admin 2. ❌ No authorization check - is sender actually admin? 3. ❌ No input validation on command 4. ❌ No validation on target_id 5. ❌ No rate limiting - can spam deletes 6. ❌ No audit logging - no record of who deleted what 7. ❌ No confirmation - accidental deletes possible 8. ❌ Error details might leak informationExercise 2: Add Security Controls¶
Rewrite the above function with proper security:
Click to see secure version
async def handle_admin_request(self, message):
# SECURITY: Validate agent card and signature
agent_card = self.validate_sender(message.sender_id)
if not agent_card:
self.audit_logger.log_suspicious_activity(
message.sender_id,
"INVALID_SENDER",
"Admin request from unverified sender"
)
return {"error": "Unauthorized"}
# SECURITY: Check admin authorization
if not agent_card.has_capability("admin"):
self.audit_logger.log_authorization_failure(
message.sender_id,
"admin",
"Attempted admin action without permission"
)
return {"error": "Forbidden"}
# SECURITY: Rate limiting
if not self.check_rate_limit(message.sender_id, "admin_actions"):
return {"error": "Rate limit exceeded"}
# SECURITY: Validate command
command = message.payload.get("command")
if command not in ["delete_agent", "update_agent", "list_agents"]:
return {"error": "Invalid command"}
if command == "delete_agent":
# SECURITY: Validate target_id
target = message.payload.get("target_id")
if not self.validate_agent_id(target):
return {"error": "Invalid target"}
# SECURITY: Audit before action
self.audit_logger.log_admin_action(
message.sender_id,
"DELETE_AGENT",
target
)
try:
self.delete_agent(target)
# SECURITY: Audit success
self.audit_logger.log_admin_action_success(
message.sender_id,
"DELETE_AGENT",
target
)
return {"status": "deleted", "target": target}
except Exception as e:
# SECURITY: Safe error message (no details leaked)
self.audit_logger.log_admin_action_failure(
message.sender_id,
"DELETE_AGENT",
target,
str(e)
)
return {"error": "Operation failed"}
🔑 Key Takeaways¶
What We Learned¶
- Security is Layered: Example 3 uses defense-in-depth
- Validation is Critical: Never trust input
- Authentication Matters: Verify identity before trusting
- Logging Enables Detection: Can't defend what you can't see
- Incremental Improvement: Example 2 shows partial security is better than none
Security Principles in Action¶
| Principle | Example 1 | Example 2 | Example 3 |
|---|---|---|---|
| Defense in Depth | ❌ Single layer (none) | ⚠️ Two layers | ✅ 6+ layers |
| Fail Secure | ❌ Fails open | ⚠️ Sometimes | ✅ Always |
| Least Privilege | ❌ No concept | ⚠️ Basic | ✅ RBAC enforced |
| Complete Mediation | ❌ Skips checks | ⚠️ Partial | ✅ Every request |
| Audit Trail | ❌ None | ⚠️ Console only | ✅ Comprehensive |
Common Anti-Patterns to Avoid¶
- ❌ "Security Later" - Example 1 mentality leads to vulnerabilities
- ❌ "Trust Input" - Always validate, never assume
- ❌ "Security Through Obscurity" - Don't rely on hiding
- ❌ "It Won't Happen To Us" - Attackers don't discriminate
- ❌ "One Layer Is Enough" - Defense in depth is essential
📚 Additional Resources¶
Related Documentation¶
- Authentication Tags - Deep dive into signatures
- Threat Model - Attack scenarios explained
- Security Best Practices - Implementation guide
Code References¶
Next Steps¶
- Review each example's code in detail
- Read the individual SECURITY_ANALYSIS.md files for each
- Try the exercises at the end of this document
- Apply these patterns to your own code
🎓 Assessment Questions¶
Test your understanding:
-
What is the primary vulnerability in Example 1's message handling?
Answer
No input validation - accepts any payload without checking -
Why is constant-time comparison important in signature verification?
Answer
Prevents timing attacks that could reveal signature information -
How does Example 3 prevent replay attacks?
Answer
Nonce-based validation - each nonce can only be used once -
What's the purpose of rate limiting in Example 3?
Answer
Prevents DOS attacks by limiting requests per agent per time window -
Why does Example 3 use structured logging instead of print statements?
Answer
Enables searching, alerting, compliance, and automated analysis
Document Version: 1.0
Last Updated: November 2025
Part of: A2A Security Learning Project
Navigation
← Previous: Security Best Practices | Next: Threat Model →
↑ Up: A2A Overview