Skip to content

Security Evolution: Code Walkthrough Comparison

Learning Path: Security
Difficulty: Intermediate to Advanced
Prerequisites: Core Concepts, Authentication Tags

← Previous: Security Best Practices | Next: Threat Model
↑ Up: A2A Overview


🎯 What You'll Learn

This document provides a side-by-side comparison of three progressive implementations:

  • How to identify security vulnerabilities in code
  • What changes between vulnerable and secure implementations
  • Why each security control matters
  • How to incrementally improve security
  • Common security anti-patterns to avoid

📚 The Three Examples

Example 1: Basic Implementation (Vulnerable) ❌

Location: examples/a2a_crypto_example/
Status: Intentionally insecure for educational purposes
Purpose: Learn to recognize vulnerabilities

Example 2: Registry Integration (Improved) ⚠️

Location: examples/a2a_crypto_simple_registry_example_1/
Status: Some security, but incomplete
Purpose: Understand incremental improvements

Example 3: Security Module (Secure) ✅

Location: examples/a2a_crypto_example/security/
Status: Production-ready security architecture
Purpose: Reference implementation


🔍 Security Control Progression Matrix

Security Control Example 1 Example 2 Example 3 Impact
Input Validation ❌ None ⚠️ Basic ✅ Comprehensive CRITICAL
Authentication ❌ None ⚠️ Simple Signature ✅ Full PKI CRITICAL
Authorization ❌ None ⚠️ Capability Check ✅ RBAC HIGH
Replay Protection ❌ None ❌ None ✅ Nonce-based HIGH
Rate Limiting ❌ None ❌ None ✅ Token Bucket MEDIUM
Audit Logging ❌ None ⚠️ Console Print ✅ Structured Logs MEDIUM
Encryption ❌ Plaintext ⚠️ Transport Only ✅ End-to-end HIGH
Error Handling ❌ Exposes Info ⚠️ Generic ✅ Safe Messages MEDIUM
Injection Prevention ❌ None ⚠️ Basic ✅ Multi-layer CRITICAL
Session Management ❌ None ⚠️ Basic ✅ Secure Tokens HIGH

🎓 How to Use This Guide

For Learners

  1. Read Example 1 code - Try to spot vulnerabilities yourself
  2. Check this guide - See what you missed
  3. Read Example 2 code - Notice the improvements
  4. Study Example 3 code - See production-ready patterns
  5. Practice - Apply to your own code

For Security Auditors

  1. Start with Example 1 - Perform a mock security audit
  2. Document findings - List all vulnerabilities
  3. Compare with this guide - Validate your findings
  4. Review Example 3 - Understand proper mitigations

For Code Reviewers

  1. Use as a checklist - Compare against your codebase
  2. Identify patterns - Recognize similar issues
  3. Recommend fixes - Reference Example 3 patterns

🔐 Security Control Deep Dives


1. Input Validation

❌ Example 1: No Validation (DANGEROUS!)

# From a2a_crypto_example/server/crypto_agent_server.py

async def handle_message(self, message: A2AMessage, client_id: str):
    """Handle incoming A2A message"""

    if message.message_type == MessageType.REQUEST:
        # SECURITY FLAW: No validation of message payload!
        method = message.payload.get("method")
        params = message.payload.get("params", {})

        if method == "get_price":
            # SECURITY FLAW: No validation of currency parameter!
            currency = params.get("currency")  # Could be ANYTHING!
            price = self.get_price(currency)   # Injection risk!

            return A2AProtocol.create_response(
                self.agent_id,
                message.sender_id,
                {"price": price, "currency": currency}
            )

Vulnerabilities: 1. ❌ No schema validation - malformed messages accepted 2. ❌ No type checking - currency could be any type 3. ❌ No length limits - DOS via huge messages 4. ❌ No sanitization - injection attacks possible 5. ❌ No whitelist - any method name accepted

Attack Scenarios:

# Attacker sends malicious payload
{
    "method": "get_price",
    "params": {
        "currency": "'; DROP TABLE agents;--"  # SQL injection attempt
    }
}

# OR

{
    "method": "__import__('os').system('rm -rf /')",  # Code injection
    "params": {}
}

# OR

{
    "method": "get_price",
    "params": {
        "currency": "A" * 10000000  # DOS via memory exhaustion
    }
}


⚠️ Example 2: Basic Validation (BETTER, BUT INCOMPLETE)

# From a2a_crypto_simple_registry_example_1/shared/a2a_protocol.py

@dataclass
class A2AMessage:
    message_id: str
    message_type: MessageType
    sender_id: str
    recipient_id: str
    timestamp: str
    payload: Dict[str, Any]
    correlation_id: Optional[str] = None

    def validate(self) -> bool:
        """Basic validation"""
        # IMPROVEMENT: Some validation added
        if not self.message_id or len(self.message_id) > 100:
            return False
        if not self.sender_id or len(self.sender_id) > 100:
            return False

        # STILL VULNERABLE: No payload validation!
        # STILL VULNERABLE: No type checking!
        # STILL VULNERABLE: No injection detection!

        return True

Improvements: 1. ✅ Length limits on IDs 2. ✅ Required field checking 3. ✅ Basic dataclass structure

Remaining Vulnerabilities: 1. ❌ Payload contents not validated 2. ❌ No injection pattern detection 3. ❌ No whitelist enforcement 4. ❌ No nested object validation


✅ Example 3: Comprehensive Validation (SECURE!)

# From a2a_crypto_example/security/validator.py

class AgentCardValidator:
    """Comprehensive validation with multiple security layers"""

    def validate_card(self, card: SecureAgentCard) -> Tuple[bool, List[str]]:
        """Multi-layer validation"""
        issues = []

        # LAYER 1: Schema validation
        if not self._validate_schema(card):
            issues.append("Invalid schema")

        # LAYER 2: Type validation
        if not isinstance(card.capabilities, dict):
            issues.append("Capabilities must be dictionary")

        # LAYER 3: Length limits
        if len(card.name) > 100:
            issues.append("Name too long")

        # LAYER 4: Injection detection
        metadata_issues = self._validate_metadata(card)
        if metadata_issues:
            issues.extend(metadata_issues)

        # LAYER 5: Whitelist enforcement
        invalid_caps = self._validate_capabilities(card)
        if invalid_caps:
            issues.append(f"Invalid capabilities: {invalid_caps}")

        # LAYER 6: Rate limiting
        if not self._check_validation_rate_limit(card.agent_id):
            issues.append("Rate limit exceeded")

        return len(issues) == 0, issues

    def _validate_metadata(self, card: SecureAgentCard) -> List[str]:
        """Detect injection attempts in metadata"""
        issues = []

        for key, value in card.metadata.items():
            # Check for suspicious keys
            if any(pattern in key.lower() for pattern in SUSPICIOUS_KEY_PATTERNS):
                issues.append(f"Suspicious key: {key}")

            # Check for injection patterns in values
            if isinstance(value, str):
                if self._contains_injection_attempt(value):
                    issues.append(f"Potential injection in '{key}'")

        return issues

    def _contains_injection_attempt(self, value: str) -> bool:
        """Pattern-based injection detection"""
        value_lower = value.lower()

        injection_patterns = [
            "script>",           # XSS
            "javascript:",       # XSS
            "onerror=",         # XSS
            "'; drop table",    # SQL injection
            "or 1=1",          # SQL injection
            "../",             # Path traversal
            "cmd.exe",         # Command injection
            "__import__",      # Python code injection
            "eval(",           # Code execution
        ]

        return any(pattern in value_lower for pattern in injection_patterns)

Security Features: 1. ✅ Multi-layer validation 2. ✅ Comprehensive type checking 3. ✅ Injection pattern detection 4. ✅ Whitelist enforcement 5. ✅ Rate limiting protection 6. ✅ Detailed error reporting

Defense in Depth: Even if one layer fails, others provide protection.


2. Authentication & Signatures

❌ Example 1: No Authentication (CRITICAL FLAW!)

# From a2a_crypto_example/server/crypto_agent_server.py

async def handle_message(self, message: A2AMessage, client_id: str):
    """Handle incoming message"""

    # SECURITY FLAW: No verification of sender identity!
    # Anyone can claim to be any agent!

    if message.message_type == MessageType.HANDSHAKE:
        # SECURITY FLAW: Agent card accepted without verification!
        agent_data = message.payload.get("agent_card")
        self.clients[client_id]["agent_id"] = agent_data["agent_id"]

        # Attacker can impersonate any agent!
        return self.create_handshake_ack()

Attack Scenario:

# Attacker impersonates admin agent
fake_admin_card = {
    "agent_id": "admin-001",  # Claim to be admin
    "name": "AdminAgent",
    "capabilities": ["admin", "delete_all"]  # Claim admin rights
}

# Server accepts without verification!
# Attacker now has admin access!


⚠️ Example 2: Basic Signature (IMPROVED, BUT WEAK)

# From a2a_crypto_simple_registry_example_1/security/validator.py

def _verify_signature(self, card: SecureAgentCard) -> bool:
    """Basic signature verification (SIMPLIFIED)"""

    if not card.signature:
        return False

    # IMPROVEMENT: Signature required
    card_data = json.dumps({
        "agent_id": card.agent_id,
        "name": card.name,
        "version": card.version,
        "capabilities": card.capabilities
    }, sort_keys=True)

    # VULNERABILITY: Simplified crypto (not production-ready!)
    expected_signature = hashlib.sha256(
        f"{card_data}{card.public_key}".encode()
    ).hexdigest()

    # IMPROVEMENT: Constant-time comparison
    return hmac.compare_digest(card.signature[:64], expected_signature)

Improvements: 1. ✅ Signature required 2. ✅ Constant-time comparison (timing attack prevention) 3. ✅ Canonical JSON representation

Remaining Issues: 1. ❌ Simplified crypto (not using proper algorithms) 2. ❌ No certificate chain validation 3. ❌ No key rotation support 4. ❌ No revocation checking


✅ Example 3: Full PKI (PRODUCTION-READY!)

# From a2a_crypto_example/security/secure_agent_card.py

@dataclass
class SecureAgentCard:
    """Secure Agent Card with full PKI support"""

    agent_id: str
    name: str
    version: str
    capabilities: Dict[str, List[str]]
    metadata: Dict[str, Any]

    # PKI Fields
    public_key: str                    # RSA/ECC public key
    signature: str                     # Cryptographic signature
    certificate_fingerprint: str       # X.509 cert fingerprint
    issuer: str                       # Certificate authority
    issued_at: datetime               # Issuance timestamp
    expires_at: datetime              # Expiration timestamp
    signature_algorithm: str = "RS256" # Algorithm specification
    trust_level: float = 0.0          # Reputation score

    def sign(self, private_key: str):
        """Sign card with private key"""
        # 1. Create canonical representation
        card_data = self._to_canonical_dict()
        canonical_json = json.dumps(card_data, sort_keys=True)

        # 2. Sign with proper cryptographic algorithm
        # (In production, use cryptography library)
        signature_bytes = rsa_sign(
            canonical_json.encode(),
            private_key,
            algorithm=self.signature_algorithm
        )

        # 3. Encode signature
        self.signature = base64.b64encode(signature_bytes).decode()

        # 4. Generate certificate fingerprint
        self.certificate_fingerprint = hashlib.sha256(
            public_key.encode()
        ).hexdigest()

    def verify(self, public_key: str) -> bool:
        """Verify signature with public key"""
        # 1. Reconstruct canonical data
        card_data = self._to_canonical_dict()
        canonical_json = json.dumps(card_data, sort_keys=True)

        # 2. Verify with proper crypto
        signature_bytes = base64.b64decode(self.signature)

        return rsa_verify(
            canonical_json.encode(),
            signature_bytes,
            public_key,
            algorithm=self.signature_algorithm
        )

Security Features: 1. ✅ Real cryptographic signatures (RSA/ECC) 2. ✅ Certificate fingerprinting 3. ✅ Issuer tracking 4. ✅ Expiration enforcement 5. ✅ Algorithm agility 6. ✅ Trust scoring 7. ✅ Canonical representation


3. Replay Attack Protection

❌ Example 1: No Protection (VULNERABLE!)

# From a2a_crypto_example

async def handle_message(self, message: A2AMessage):
    """Process message"""

    # SECURITY FLAW: No replay protection!
    # Attacker can record and replay messages!

    if message.message_type == MessageType.REQUEST:
        # Process request - same message can be replayed infinite times
        result = await self.process_request(message)
        return result

Attack Scenario:

# 1. Attacker intercepts legitimate message
legitimate_message = {
    "message_id": "msg-123",
    "message_type": "REQUEST",
    "sender_id": "legitimate-agent",
    "payload": {
        "method": "transfer_funds",
        "params": {"amount": 1000, "to": "attacker-account"}
    }
}

# 2. Attacker replays message 100 times
# Result: $100,000 transferred instead of $1,000!
for i in range(100):
    send_message(legitimate_message)  # Same message, replayed


❌ Example 2: Still No Protection!

Example 2 also lacks replay protection - a significant oversight!


✅ Example 3: Nonce-Based Protection (SECURE!)

# From a2a_crypto_example/security/manager.py

class SecureAgentCardManager:
    """Manages secure agent cards with replay protection"""

    def __init__(self, local_agent_id: str):
        self.local_agent_id = local_agent_id

        # Replay protection: Track used nonces
        self.used_nonces: Dict[str, datetime] = {}
        self.nonce_expiry = timedelta(minutes=5)

    def generate_nonce(self) -> str:
        """Generate cryptographically secure nonce"""
        # 32 bytes = 256 bits of entropy
        return secrets.token_urlsafe(32)

    def validate_nonce(self, nonce: str) -> Tuple[bool, str]:
        """Validate nonce hasn't been used"""

        # Check if nonce was previously used
        if nonce in self.used_nonces:
            return False, "Nonce already used (replay attack detected)"

        # Mark nonce as used
        self.used_nonces[nonce] = datetime.now()

        # Cleanup expired nonces
        self._cleanup_expired_nonces()

        return True, "Nonce valid"

    def _cleanup_expired_nonces(self):
        """Remove expired nonces to prevent memory growth"""
        current_time = datetime.now()

        expired = [
            nonce for nonce, timestamp in self.used_nonces.items()
            if current_time - timestamp > self.nonce_expiry
        ]

        for nonce in expired:
            del self.used_nonces[nonce]

    def exchange_cards(
        self,
        local_card: SecureAgentCard,
        remote_card_data: Dict,
        nonce: str
    ) -> Tuple[bool, Optional[SecureAgentCard], str]:
        """Exchange cards with replay protection"""

        # STEP 1: Validate nonce
        nonce_valid, nonce_message = self.validate_nonce(nonce)
        if not nonce_valid:
            self.audit_logger.log_replay_attempt(
                remote_card_data.get("agent_id"),
                nonce
            )
            return False, None, nonce_message

        # STEP 2: Validate remote card
        # ... rest of validation ...

        return True, remote_card, "Exchange successful"

Security Features: 1. ✅ Cryptographically secure nonce generation 2. ✅ Nonce uniqueness enforcement 3. ✅ Time-based nonce expiration 4. ✅ Automatic cleanup (prevents memory leaks) 5. ✅ Replay attempt logging 6. ✅ 256-bit entropy

How It Works:

Request 1 (nonce: "abc123"):
  ✓ Nonce not seen before → Accept
  ✓ Add "abc123" to used_nonces

Request 2 (nonce: "abc123"):  [REPLAY ATTACK]
  ✗ Nonce "abc123" already used → REJECT
  ✓ Log replay attempt
  ✓ Alert security team

Request 3 (nonce: "xyz789"):
  ✓ Nonce not seen before → Accept
  ✓ Add "xyz789" to used_nonces


4. Rate Limiting

❌ Example 1 & 2: No Rate Limiting (DOS VULNERABLE!)

# Both examples lack rate limiting

async def handle_message(self, message):
    """Process message"""
    # SECURITY FLAW: No rate limiting!
    # Attacker can send unlimited requests!

    return await self.process(message)

Attack Scenario:

# Attacker floods server with requests
for i in range(1000000):
    send_request({"method": "get_price", "params": {"currency": "BTC"}})

# Result: Server crashes from resource exhaustion


✅ Example 3: Token Bucket Rate Limiting (PROTECTED!)

# From a2a_crypto_example/security/validator.py

class AgentCardValidator:
    """Validator with rate limiting"""

    def __init__(self):
        # Rate limiting tracking
        self.validation_attempts: Dict[str, List[float]] = {}
        self.max_attempts_per_minute = 10

    def _check_validation_rate_limit(self, agent_id: str) -> bool:
        """Token bucket rate limiting"""

        current_time = time.time()
        window_start = current_time - 60  # 1 minute window

        # Initialize if first request
        if agent_id not in self.validation_attempts:
            self.validation_attempts[agent_id] = []

        # Remove attempts outside the window
        self.validation_attempts[agent_id] = [
            timestamp for timestamp in self.validation_attempts[agent_id]
            if timestamp > window_start
        ]

        # Check if at limit
        if len(self.validation_attempts[agent_id]) >= self.max_attempts_per_minute:
            # RATE LIMIT EXCEEDED
            return False

        # Record this attempt
        self.validation_attempts[agent_id].append(current_time)
        return True

Security Features: 1. ✅ Per-agent rate limiting 2. ✅ Sliding window algorithm 3. ✅ Configurable thresholds 4. ✅ Automatic cleanup 5. ✅ Fair resource allocation

How It Works:

Agent A:
  Request 1-10: ✓ Accepted (within limit)
  Request 11:   ✗ RATE LIMITED
  [Wait 60s]
  Request 12:   ✓ Accepted (window reset)

Agent B:
  Request 1-10: ✓ Accepted (separate quota)


5. Audit Logging

❌ Example 1: No Logging (NO VISIBILITY!)

# No security logging at all!
# Cannot detect attacks, debug issues, or prove compliance

⚠️ Example 2: Console Logging (INADEQUATE!)

# Basic console logging
print(f"📨 Received: {message.message_type} from {message.sender_id}")
print(f"✅ Handshake complete with: {agent.name}")

# PROBLEMS:
# - Not structured
# - Not persistent
# - No severity levels
# - No searchability
# - No alerting

✅ Example 3: Structured Audit Logging (COMPREHENSIVE!)

# From a2a_crypto_example/security/audit_logger.py

class SecurityAuditLogger:
    """Comprehensive security event logging"""

    def __init__(self, component_name: str):
        self.component_name = component_name
        self.events: List[SecurityEvent] = []
        self.agent_history: Dict[str, List[SecurityEvent]] = {}

    def log_card_exchange(
        self,
        event_type: str,
        local_agent_id: str,
        remote_agent_id: str,
        success: bool,
        details: Optional[str] = None
    ):
        """Log card exchange with full context"""
        event = SecurityEvent(
            event_id=str(uuid.uuid4()),
            timestamp=datetime.now(),
            event_type=SecurityEventType.CARD_EXCHANGE,
            severity=SecuritySeverity.INFO if success else SecuritySeverity.WARNING,
            agent_id=remote_agent_id,
            description=f"{event_type}: {local_agent_id}{remote_agent_id}",
            details={
                "local_agent": local_agent_id,
                "remote_agent": remote_agent_id,
                "exchange_type": event_type,
                "success": success,
                "additional_info": details
            },
            component=self.component_name
        )

        self._store_event(event)

    def log_validation_failure(
        self,
        agent_id: str,
        reasons: List[str],
        card_data: Optional[Dict] = None
    ):
        """Log validation failure with reasons"""
        event = SecurityEvent(
            event_id=str(uuid.uuid4()),
            timestamp=datetime.now(),
            event_type=SecurityEventType.VALIDATION_FAILURE,
            severity=SecuritySeverity.WARNING,
            agent_id=agent_id,
            description=f"Validation failed: {', '.join(reasons)}",
            details={
                "failure_reasons": reasons,
                "card_summary": self._sanitize_card_data(card_data)
            },
            component=self.component_name
        )

        self._store_event(event)

    def log_suspicious_activity(
        self,
        agent_id: str,
        activity_type: str,
        description: str
    ):
        """Log suspicious activity for security analysis"""
        event = SecurityEvent(
            event_id=str(uuid.uuid4()),
            timestamp=datetime.now(),
            event_type=SecurityEventType.SUSPICIOUS_ACTIVITY,
            severity=SecuritySeverity.HIGH,
            agent_id=agent_id,
            description=f"Suspicious: {activity_type} - {description}",
            details={
                "activity_type": activity_type,
                "detailed_description": description
            },
            component=self.component_name
        )

        self._store_event(event)
        self._trigger_alert(event)  # Immediate notification!

    def log_replay_attempt(self, agent_id: str, nonce: str):
        """Log replay attack attempt"""
        event = SecurityEvent(
            event_id=str(uuid.uuid4()),
            timestamp=datetime.now(),
            event_type=SecurityEventType.REPLAY_ATTEMPT,
            severity=SecuritySeverity.CRITICAL,  # High priority!
            agent_id=agent_id,
            description=f"Replay attack detected from {agent_id}",
            details={"nonce": nonce},
            component=self.component_name
        )

        self._store_event(event)
        self._trigger_alert(event)  # CRITICAL: Alert immediately!

    def export_events(self, filepath: str, format: str = "json"):
        """Export audit log for compliance/analysis"""
        if format == "json":
            with open(filepath, 'w') as f:
                json.dump([event.to_dict() for event in self.events], f, indent=2)
        elif format == "csv":
            # CSV export for spreadsheet analysis
            pass

    def get_statistics(self) -> Dict[str, Any]:
        """Get security statistics for monitoring"""
        return {
            "total_events": len(self.events),
            "by_severity": self._count_by_severity(),
            "by_type": self._count_by_type(),
            "top_agents": self._get_top_agents(10),
            "recent_alerts": self._get_recent_alerts(50)
        }

Logging Features: 1. ✅ Structured events (searchable, queryable) 2. ✅ Severity levels (INFO, WARNING, HIGH, CRITICAL) 3. ✅ Per-agent history tracking 4. ✅ Export capabilities (JSON, CSV) 5. ✅ Statistics and monitoring 6. ✅ Alert triggering for critical events 7. ✅ Compliance-ready format


📊 Attack Scenario Comparison

Let's see how each version handles a SQL injection attack:

Attack: SQL Injection via Currency Parameter

malicious_request = {
    "method": "get_price",
    "params": {
        "currency": "BTC'; DROP TABLE agents;--"
    }
}

❌ Example 1 Response:

✗ Accepts malicious input
✗ No validation
✗ Passes directly to processing
✗ Could execute SQL if database involved
✗ No logging of attack
✗ No alert generated

RESULT: SYSTEM COMPROMISED

⚠️ Example 2 Response:

⚠ Accepts input (no payload validation)
✓ Some field length checking
✗ No injection detection
⚠ Console log: "Received request"
✗ No alert generated

RESULT: STILL VULNERABLE

✅ Example 3 Response:

✓ Input validation triggered
✓ Injection pattern detected: "'; DROP TABLE"
✓ Request REJECTED before processing
✓ Structured log: SecurityEvent(SUSPICIOUS_ACTIVITY, severity=HIGH)
✓ Alert triggered to security team
✓ Agent reputation score decreased
✓ Repeat attempts trigger auto-block

RESULT: ATTACK PREVENTED AND LOGGED

🎯 Practical Exercises

Exercise 1: Spot the Vulnerabilities

Look at this code snippet and identify security issues:

async def handle_admin_request(self, message):
    agent_id = message.sender_id
    command = message.payload["command"]

    if command == "delete_agent":
        target = message.payload["target_id"]
        self.delete_agent(target)
        return {"status": "deleted"}
Click to see vulnerabilities 1. ❌ No authentication - anyone can claim to be admin 2. ❌ No authorization check - is sender actually admin? 3. ❌ No input validation on command 4. ❌ No validation on target_id 5. ❌ No rate limiting - can spam deletes 6. ❌ No audit logging - no record of who deleted what 7. ❌ No confirmation - accidental deletes possible 8. ❌ Error details might leak information

Exercise 2: Add Security Controls

Rewrite the above function with proper security:

Click to see secure version
async def handle_admin_request(self, message):
    # SECURITY: Validate agent card and signature
    agent_card = self.validate_sender(message.sender_id)
    if not agent_card:
        self.audit_logger.log_suspicious_activity(
            message.sender_id,
            "INVALID_SENDER",
            "Admin request from unverified sender"
        )
        return {"error": "Unauthorized"}

    # SECURITY: Check admin authorization
    if not agent_card.has_capability("admin"):
        self.audit_logger.log_authorization_failure(
            message.sender_id,
            "admin",
            "Attempted admin action without permission"
        )
        return {"error": "Forbidden"}

    # SECURITY: Rate limiting
    if not self.check_rate_limit(message.sender_id, "admin_actions"):
        return {"error": "Rate limit exceeded"}

    # SECURITY: Validate command
    command = message.payload.get("command")
    if command not in ["delete_agent", "update_agent", "list_agents"]:
        return {"error": "Invalid command"}

    if command == "delete_agent":
        # SECURITY: Validate target_id
        target = message.payload.get("target_id")
        if not self.validate_agent_id(target):
            return {"error": "Invalid target"}

        # SECURITY: Audit before action
        self.audit_logger.log_admin_action(
            message.sender_id,
            "DELETE_AGENT",
            target
        )

        try:
            self.delete_agent(target)

            # SECURITY: Audit success
            self.audit_logger.log_admin_action_success(
                message.sender_id,
                "DELETE_AGENT",
                target
            )

            return {"status": "deleted", "target": target}

        except Exception as e:
            # SECURITY: Safe error message (no details leaked)
            self.audit_logger.log_admin_action_failure(
                message.sender_id,
                "DELETE_AGENT",
                target,
                str(e)
            )
            return {"error": "Operation failed"}

🔑 Key Takeaways

What We Learned

  1. Security is Layered: Example 3 uses defense-in-depth
  2. Validation is Critical: Never trust input
  3. Authentication Matters: Verify identity before trusting
  4. Logging Enables Detection: Can't defend what you can't see
  5. Incremental Improvement: Example 2 shows partial security is better than none

Security Principles in Action

Principle Example 1 Example 2 Example 3
Defense in Depth ❌ Single layer (none) ⚠️ Two layers ✅ 6+ layers
Fail Secure ❌ Fails open ⚠️ Sometimes ✅ Always
Least Privilege ❌ No concept ⚠️ Basic ✅ RBAC enforced
Complete Mediation ❌ Skips checks ⚠️ Partial ✅ Every request
Audit Trail ❌ None ⚠️ Console only ✅ Comprehensive

Common Anti-Patterns to Avoid

  1. "Security Later" - Example 1 mentality leads to vulnerabilities
  2. "Trust Input" - Always validate, never assume
  3. "Security Through Obscurity" - Don't rely on hiding
  4. "It Won't Happen To Us" - Attackers don't discriminate
  5. "One Layer Is Enough" - Defense in depth is essential

📚 Additional Resources

Code References

Next Steps

  1. Review each example's code in detail
  2. Read the individual SECURITY_ANALYSIS.md files for each
  3. Try the exercises at the end of this document
  4. Apply these patterns to your own code

🎓 Assessment Questions

Test your understanding:

  1. What is the primary vulnerability in Example 1's message handling?

    AnswerNo input validation - accepts any payload without checking

  2. Why is constant-time comparison important in signature verification?

    AnswerPrevents timing attacks that could reveal signature information

  3. How does Example 3 prevent replay attacks?

    AnswerNonce-based validation - each nonce can only be used once

  4. What's the purpose of rate limiting in Example 3?

    AnswerPrevents DOS attacks by limiting requests per agent per time window

  5. Why does Example 3 use structured logging instead of print statements?

    AnswerEnables searching, alerting, compliance, and automated analysis


Document Version: 1.0
Last Updated: November 2025
Part of: A2A Security Learning Project


Navigation
← Previous: Security Best Practices | Next: Threat Model
↑ Up: A2A Overview