#!/usr/bin/env python3 """ Agent Provenance Chain (APC) Cryptographic audit trail for autonomous AI agents. Every action signed. Every decision traceable. Full transparency. """ import hashlib import json import time from datetime import datetime from pathlib import Path from typing import Any, Dict, Optional from cryptography.hazmat.primitives.asymmetric.ed25519 import ( Ed25519PrivateKey, Ed25519PublicKey, ) from cryptography.hazmat.primitives import serialization import base64 class AgentProvenanceChain: """ Cryptographic audit trail for agent actions. Every action is: - Timestamped - Signed with Ed25519 - Linked to previous action (blockchain-style) - Stored immutably """ def __init__(self, agent_name: str, key_path: Optional[Path] = None): self.agent_name = agent_name self.key_path = key_path or Path.home() / ".apc" / f"{agent_name}.key" self.chain_path = Path.home() / ".apc" / f"{agent_name}.chain.jsonl" self._ensure_directories() self.private_key = self._load_or_generate_key() self.public_key = self.private_key.public_key() self.last_hash = self._get_last_hash() def _ensure_directories(self): """Create necessary directories.""" self.key_path.parent.mkdir(parents=True, exist_ok=True) self.chain_path.parent.mkdir(parents=True, exist_ok=True) def _load_or_generate_key(self) -> Ed25519PrivateKey: """Load existing key or generate new one.""" if self.key_path.exists(): with open(self.key_path, "rb") as f: return serialization.load_pem_private_key(f.read(), password=None) # Generate new key private_key = Ed25519PrivateKey.generate() # Save it pem = private_key.private_bytes( encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.PKCS8, encryption_algorithm=serialization.NoEncryption() ) with open(self.key_path, "wb") as f: f.write(pem) self.key_path.chmod(0o600) return private_key def _get_last_hash(self) -> str: """Get hash of last action in chain.""" if not self.chain_path.exists(): return "0" * 64 # Genesis hash with open(self.chain_path, "r") as f: lines = f.readlines() if not lines: return "0" * 64 last_line = lines[-1].strip() last_action = json.loads(last_line) return last_action["hash"] def sign_action( self, action_type: str, payload: Dict[str, Any], context: Optional[Dict[str, Any]] = None ) -> Dict[str, Any]: """ Sign an action and append to chain. Args: action_type: Type of action (e.g., "exec", "write_file", "api_call") payload: Action data context: Optional context (reasoning, session_id, etc.) Returns: Signed action record """ timestamp = time.time() # Build action record action = { "agent": self.agent_name, "timestamp": timestamp, "iso_time": datetime.utcfromtimestamp(timestamp).isoformat() + "Z", "type": action_type, "payload": payload, "context": context or {}, "previous_hash": self.last_hash, } # Compute hash action_bytes = json.dumps(action, sort_keys=True).encode() action_hash = hashlib.sha256(action_bytes).hexdigest() action["hash"] = action_hash # Sign the hash signature = self.private_key.sign(action_hash.encode()) action["signature"] = base64.b64encode(signature).decode() # Append to chain with open(self.chain_path, "a") as f: f.write(json.dumps(action) + "\n") # Update last hash self.last_hash = action_hash return action def get_public_key_pem(self) -> str: """Get public key in PEM format for verification.""" pem = self.public_key.public_bytes( encoding=serialization.Encoding.PEM, format=serialization.PublicFormat.SubjectPublicKeyInfo ) return pem.decode() def get_chain(self, limit: Optional[int] = None) -> list: """Retrieve action chain.""" if not self.chain_path.exists(): return [] with open(self.chain_path, "r") as f: lines = f.readlines() if limit: lines = lines[-limit:] return [json.loads(line) for line in lines if line.strip()] def verify_chain_integrity(self) -> tuple[bool, Optional[str]]: """ Verify the entire chain is intact and unmodified. Returns: (is_valid, error_message) """ chain = self.get_chain() if not chain: return True, None expected_prev = "0" * 64 for i, action in enumerate(chain): # Check previous hash linkage if action["previous_hash"] != expected_prev: return False, f"Chain broken at index {i}: hash mismatch" # Verify signature try: sig_bytes = base64.b64decode(action["signature"]) self.public_key.verify(sig_bytes, action["hash"].encode()) except Exception as e: return False, f"Invalid signature at index {i}: {e}" expected_prev = action["hash"] return True, None # Convenience function def create_agent_chain(agent_name: str) -> AgentProvenanceChain: """Create or load an agent provenance chain.""" return AgentProvenanceChain(agent_name)