Task Collaboration Agent - Stage 2: Improved¶
Path:
examples/a2a_task_collab_example/stage2_improved
Overview¶
Stage 2 demonstrates partial security improvements that are insufficient for production. This stage teaches that better does not equal secure in distributed multi-agent systems.
Security Rating: ⚠️ 4/10 - PARTIALLY SECURE
Status: ⚠️ Not Production Ready - Significant vulnerabilities remain
Key Learning Focus¶
This stage focuses on understanding why partial security measures fail in multi-agent systems and the critical importance of comprehensive, layered security.
What You'll Learn¶
- Why UUID4 session IDs aren't enough
- Limitations of password-only authentication
- Gaps in basic authorization
- Why no encryption still fails
- How attackers exploit remaining weaknesses
- The necessity of complete security
Architecture¶

Components¶
task_coordinator.py: With SessionManager and AuthManagersession_manager.py: UUID4 IDs, basic timeoutsauth_manager.py: Password authentication (bcrypt)worker_agent.py: Basic task validationaudit_agent.py: Improved loggingclient.py: Updated for authentication
✅ Improvements from Stage 1¶
1. UUID4 Session IDs¶
import uuid
# ✅ Random session IDs
def create_session(user_id):
session_id = str(uuid.uuid4())
# Output: e3b0c442-98fc-1c14-b39f-92d1282e1f18
sessions[session_id] = {
'user_id': user_id,
'created': time.time(),
'last_activity': time.time()
}
return session_id
Benefit: Much harder to guess than sequential IDs
But Still Vulnerable: Can be sniffed (no TLS)
2. Password Authentication¶
import bcrypt
# ✅ Password verification with bcrypt
class AuthManager:
def authenticate(self, username, password):
if username not in self.users:
# Constant-time failure
bcrypt.checkpw(b'dummy', bcrypt.gensalt())
return None
user = self.users[username]
password_hash = user['password_hash'].encode()
if bcrypt.checkpw(password.encode(), password_hash):
return user['id']
return None
Benefit: Can't impersonate without password
But Still Vulnerable: No MFA, no rate limiting
3. Session Timeouts¶
# ✅ Idle timeout
IDLE_TIMEOUT = 30 * 60 # 30 minutes
def validate_session(session_id):
if session_id not in sessions:
return False
session = sessions[session_id]
# Check idle timeout
idle_time = time.time() - session['last_activity']
if idle_time > IDLE_TIMEOUT:
del sessions[session_id]
return False
# Update activity
session['last_activity'] = time.time()
return True
Benefit: Sessions eventually expire
But Still Vulnerable: No absolute timeout, no binding
4. Basic Authorization¶
# ✅ Check ownership
def create_task(session_id, project_id, task_data):
# Validate session
session = get_session(session_id)
if not session:
return {'error': 'Invalid session'}
# Check if user owns project
project = get_project(project_id)
if project['owner_id'] != session['user_id']:
return {'error': 'Access denied'}
# Create task
task = create_task_internal(project_id, task_data)
return {'status': 'success', 'task': task}
Benefit: Basic permission checking
But Still Vulnerable: No role-based access, incomplete checks
5. HMAC Message Signatures¶
import hmac
import hashlib
# ✅ Sign messages
SECRET_KEY = "shared-secret-key" # ⚠️ Hardcoded!
def sign_message(message):
message_bytes = json.dumps(message, sort_keys=True).encode()
signature = hmac.new(
SECRET_KEY.encode(),
message_bytes,
hashlib.sha256
).hexdigest()
return signature
def verify_message(message, signature):
expected = sign_message(message)
return hmac.compare_digest(expected, signature)
Benefit: Detect message tampering
But Still Vulnerable: No nonce = replay attacks still work
⚠️ Remaining Vulnerabilities¶
Despite improvements, 10+ critical vulnerabilities remain:
1. No TLS Encryption (CRITICAL)¶
# ❌ Still plaintext TCP
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(('0.0.0.0', 8000))
# No TLS wrapper!
Attack:
# UUID4 sessions still visible in traffic
tcpdump -i any port 8000 -A | grep -E '[0-9a-f]{8}-[0-9a-f]{4}'
# Captures all session IDs
Impact: Session stealing via network sniffing
2. No MFA (HIGH)¶
# ⚠️ Single-factor authentication
def login(username, password):
user_id = auth_manager.authenticate(username, password)
if user_id:
return create_session(user_id)
return None
Attack:
# Stolen/weak password = full access
stolen_password = "password123"
session = login("victim", stolen_password)
# No second factor required
Impact: Compromised passwords = compromised accounts
3. No Rate Limiting (HIGH)¶
# ❌ Still unlimited attempts
def handle_login(username, password):
# No throttling
# No attempt counting
# No lockout
return authenticate(username, password)
Attack:
# Brute force passwords
passwords = load_wordlist()
for password in passwords:
if login("target_user", password):
print(f"Found: {password}")
break
Impact: Brute force attacks succeed
4. No Replay Protection (HIGH)¶
# ⚠️ HMAC signature but no nonce
def handle_message(data, signature):
message = json.loads(data)
# Verify signature
if not verify_message(message, signature):
return error("Invalid signature")
# ❌ But same message can be replayed!
return process_message(message)
Attack:
# Capture signed message
captured = {
'type': 'transfer_funds',
'amount': 1000,
'signature': 'abc123...'
}
# Replay it 100 times
for i in range(100):
send_message(captured)
# Each replay succeeds!
Impact: Replay attacks still work
5. Incomplete Session Binding (HIGH)¶
# ⚠️ Only user_id binding, nothing else
def validate_session(session_id):
if session_id not in sessions:
return False
session = sessions[session_id]
# ❌ No IP address check
# ❌ No user agent check
# ❌ No certificate binding
return check_timeout(session)
Attack:
# Steal session, use from different IP/machine
stolen_session = sniff_session_id()
# Works from anywhere!
use_session(stolen_session)
Impact: Stolen sessions fully functional
6. No State Encryption (HIGH)¶
# ⚠️ Sessions stored in plaintext
sessions = {
'e3b0c442-...': {
'user_id': 'user123',
'permissions': ['admin'], # Plaintext!
'project_access': [1, 2, 3]
}
}
def save_state():
with open('sessions.json', 'w') as f:
json.dump(sessions, f) # ❌ Plaintext file
Attack:
Impact: File system access = full session compromise
7. Weak Secret Management (HIGH)¶
# ❌ Hardcoded secrets
SECRET_KEY = "shared-secret-key" # In source code!
DATABASE_PASSWORD = "dbpass123" # Committed to git!
# ❌ Shared secret across all agents
def init_agent():
return HMACVerifier(SECRET_KEY) # Same key everywhere
Attack: - Secrets in version control - Secrets in memory dumps - Secrets in config files
Impact: Complete cryptographic bypass
8. No Absolute Session Timeout (MEDIUM)¶
# ⚠️ Only idle timeout, no maximum lifetime
def validate_session(session_id):
session = sessions.get(session_id)
# Check idle timeout
if time.time() - session['last_activity'] > IDLE_TIMEOUT:
return False
# ❌ No check of total session age
# Session can live forever if kept active
return True
Attack:
# Keep session alive indefinitely
while True:
keep_alive(session_id)
time.sleep(29 * 60) # Just under idle timeout
# Session never expires!
Impact: Sessions can be kept alive forever
9. Incomplete Audit Logging (MEDIUM)¶
# ⚠️ Some logging but not comprehensive
def handle_action(session_id, action):
# Log action
log(f"User {session['user_id']} performed {action}")
# ❌ Missing:
# - IP address
# - Timestamp precision
# - Request details
# - Failure reasons
# - Security events
Attack: Unauthorized actions not properly tracked
Impact: No forensics capability
10. No Message Ordering (MEDIUM)¶
# ❌ Messages processed in any order
def handle_messages(messages):
for message in messages:
process(message)
# No sequence checking
# No dependency validation
Attack:
# Send messages out of order
send_message({'seq': 3, 'action': 'delete'})
send_message({'seq': 2, 'action': 'create'})
send_message({'seq': 1, 'action': 'init'})
# All processed, wrong order causes corruption
Impact: State corruption via message reordering
Attack Success Matrix¶
| Attack Type | Stage 1 | Stage 2 | Stage 3 |
|---|---|---|---|
| Session Guessing | ✅ Succeeds | ❌ Blocked | ❌ Blocked |
| Session Sniffing | ✅ Succeeds | ✅ Succeeds | ❌ Blocked |
| Identity Spoofing | ✅ Succeeds | ❌ Blocked | ❌ Blocked |
| Replay Attack | ✅ Succeeds | ✅ Succeeds | ❌ Blocked |
| Brute Force Login | N/A | ✅ Succeeds | ❌ Blocked |
| Session Theft | ✅ Succeeds | ✅ Succeeds | ❌ Blocked |
| Privilege Escalation | ✅ Succeeds | ⚠️ Partial | ❌ Blocked |
| Message Tampering | ✅ Succeeds | ❌ Blocked | ❌ Blocked |
| State Corruption | ✅ Succeeds | ⚠️ Harder | ❌ Blocked |
| DoS (No Rate Limit) | ✅ Succeeds | ✅ Succeeds | ❌ Blocked |
Legend: ✅ = Succeeds, ⚠️ = Partially mitigated, ❌ = Blocked
Attack Demonstrations¶
Demo 1: Session Sniffing (Still Works)¶
# demo_sniff_stage2.py
import socket
from scapy.all import sniff, TCP
def capture_sessions():
"""UUID4 sessions still visible without TLS"""
packets = sniff(filter="tcp port 8000", count=100)
sessions = []
for packet in packets:
if packet.haslayer(TCP):
payload = str(packet[TCP].payload)
# Look for UUID4 pattern
import re
uuids = re.findall(
r'[0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}',
payload
)
sessions.extend(uuids)
return list(set(sessions))
# Run attack
captured = capture_sessions()
print(f"✅ Captured {len(captured)} sessions")
print(f"Sample: {captured[0]}")
Expected Result: Successfully captures UUID4 session IDs
Demo 2: Replay Attack (Still Works)¶
# demo_replay_stage2.py
def replay_attack():
"""HMAC signature doesn't prevent replay"""
# Capture a legitimate signed message
original = capture_message()
# {
# 'type': 'assign_task',
# 'task_id': '123',
# 'worker': 'worker1',
# 'signature': 'abc123...'
# }
# Replay it multiple times
for i in range(10):
response = send_message(original)
print(f"Replay {i}: {response['status']}")
# All succeed - no nonce check!
print("✅ Replay attack successful")
replay_attack()
Expected Result: All replays processed successfully
Demo 3: Brute Force (Still Works)¶
# demo_bruteforce_stage2.py
import time
def brute_force_login():
"""No rate limiting allows brute force"""
passwords = [
'password', 'Password1', '123456',
'qwerty', 'letmein', 'admin'
]
start = time.time()
for password in passwords:
response = try_login('target_user', password)
if response.get('session_id'):
elapsed = time.time() - start
print(f"✅ Found password: {password}")
print(f"Time: {elapsed:.2f}s")
print(f"No throttling detected!")
return
print("Password not in list")
brute_force_login()
Expected Result: Successfully brute forces password
Demo 4: Session Theft (Still Works)¶
# demo_theft_stage2.py
def session_theft():
"""Stolen sessions work from anywhere"""
# Attacker sniffs network
victim_session = sniff_session() # UUID4
print(f"Sniffed session: {victim_session}")
# Use from different machine/IP
# No session binding prevents this!
response = use_session_remotely(
victim_session,
from_ip='10.0.0.99', # Different IP
user_agent='AttackerBot' # Different UA
)
if response.get('status') == 'success':
print("✅ Session theft successful")
print("No binding checks detected!")
session_theft()
Expected Result: Stolen session works remotely
Key Differences from Stage 1¶
| Feature | Stage 1 | Stage 2 | Improvement |
|---|---|---|---|
| Session IDs | Sequential | UUID4 | +95% |
| Authentication | None | Password (bcrypt) | +100% |
| Authorization | None | Basic ownership | +60% |
| Message Integrity | None | HMAC signatures | +80% |
| Session Timeouts | None | Idle timeout | +70% |
| TLS Encryption | None | None | 0% |
| MFA | None | None | 0% |
| Rate Limiting | None | None | 0% |
| Replay Protection | None | None | 0% |
| State Encryption | None | None | 0% |
| Overall Security | 0/10 | 4/10 | +40% |
Conclusion: Much better, but still fails in production
Running the Example¶
Setup¶
cd examples/a2a_task_collab_example/stage2_improved
# Install dependencies
pip install -r requirements.txt
# Generate password hashes (for test users)
python scripts/setup_users.py
# Start coordinator
python server/task_coordinator.py
# In separate terminals:
python server/worker_agent.py --port 8001
python server/worker_agent.py --port 8002
python server/audit_agent.py --port 8003
Try the Attacks¶
# Terminal 1: Start all servers (see above)
# Terminal 2: Run Stage 2 specific attacks
python demos/demo_sniff_stage2.py # Still works!
python demos/demo_replay_stage2.py # Still works!
python demos/demo_bruteforce_stage2.py # Still works!
python demos/demo_theft_stage2.py # Still works!
# Compare with Stage 1 attacks
python demos/demo_hijack_stage1.py # NOW FAILS!
python demos/demo_spoof_stage1.py # NOW FAILS!
What to Observe¶
- ❌ Session guessing now fails (UUID4)
- ❌ Identity spoofing now fails (passwords)
- ❌ Message tampering now fails (HMAC)
- ✅ But session sniffing still works (no TLS)
- ✅ But replay attacks still work (no nonce)
- ✅ But brute force still works (no rate limit)
- ✅ But session theft still works (no binding)
Study Checklist¶
- Compare code with Stage 1
- Understand UUID4 improvements
- Test password authentication
- Verify HMAC signatures work
- Identify 10+ remaining vulnerabilities
- Successfully run attacks that still work
- Understand why partial security fails
- Ready for Stage 3 production patterns
Key Takeaways¶
- UUID4 > Sequential: But still vulnerable without encryption
- Password auth is essential: But insufficient alone (need MFA)
- HMAC prevents tampering: But doesn't prevent replay
- Partial improvements create false confidence: "Better" ≠ "Secure"
- Defense-in-depth required: Single layers fail
- Network encryption is critical: Without TLS, everything visible
- Complete solutions needed: Piecemeal security fails
Next: Stage 3 (Secure)¶
Stage 3 implements production-grade security with comprehensive protections.
Additional protections in Stage 3: - ✅ TLS 1.3 encryption - ✅ 256-bit cryptographically random session IDs - ✅ MFA enforcement (TOTP) - ✅ Full session binding (IP, UA, cert) - ✅ Nonce-based replay protection - ✅ Token bucket rate limiting - ✅ State encryption (AES-256-GCM) - ✅ Comprehensive audit logging - ✅ Full RBAC authorization - ✅ Absolute + idle timeouts
Time to Complete: 8-12 hours
Difficulty: ⭐⭐⭐ Advanced
Prerequisites: Stage 1-2 complete, TLS basics, cryptography fundamentals
Resources¶
Version: 1.0
Last Updated: January 2026
Status: Educational - Not Production Ready