commit b90c3d0da5787f4dc0ae9946f43688f1a4a8ec40 Author: agent-molt Date: Sat Feb 7 21:10:32 2026 +0530 Initial commit: Core APC library + demo - Ed25519 cryptographic signing - Blockchain-style chain linkage - Immutable audit trail - Full verification support diff --git a/apc.py b/apc.py new file mode 100644 index 0000000..d9bad2e --- /dev/null +++ b/apc.py @@ -0,0 +1,190 @@ +#!/usr/bin/env python3 +""" +Agent Provenance Chain (APC) +Cryptographic audit trail for autonomous AI agents. + +Every action signed. Every decision traceable. Full transparency. +""" + +import hashlib +import json +import time +from datetime import datetime +from pathlib import Path +from typing import Any, Dict, Optional +from cryptography.hazmat.primitives.asymmetric.ed25519 import ( + Ed25519PrivateKey, + Ed25519PublicKey, +) +from cryptography.hazmat.primitives import serialization +import base64 + + +class AgentProvenanceChain: + """ + Cryptographic audit trail for agent actions. + + Every action is: + - Timestamped + - Signed with Ed25519 + - Linked to previous action (blockchain-style) + - Stored immutably + """ + + def __init__(self, agent_name: str, key_path: Optional[Path] = None): + self.agent_name = agent_name + self.key_path = key_path or Path.home() / ".apc" / f"{agent_name}.key" + self.chain_path = Path.home() / ".apc" / f"{agent_name}.chain.jsonl" + + self._ensure_directories() + self.private_key = self._load_or_generate_key() + self.public_key = self.private_key.public_key() + + self.last_hash = self._get_last_hash() + + def _ensure_directories(self): + """Create necessary directories.""" + self.key_path.parent.mkdir(parents=True, exist_ok=True) + self.chain_path.parent.mkdir(parents=True, exist_ok=True) + + def _load_or_generate_key(self) -> Ed25519PrivateKey: + """Load existing key or generate new one.""" + if self.key_path.exists(): + with open(self.key_path, "rb") as f: + return serialization.load_pem_private_key(f.read(), password=None) + + # Generate new key + private_key = Ed25519PrivateKey.generate() + + # Save it + pem = private_key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.PKCS8, + encryption_algorithm=serialization.NoEncryption() + ) + + with open(self.key_path, "wb") as f: + f.write(pem) + + self.key_path.chmod(0o600) + + return private_key + + def _get_last_hash(self) -> str: + """Get hash of last action in chain.""" + if not self.chain_path.exists(): + return "0" * 64 # Genesis hash + + with open(self.chain_path, "r") as f: + lines = f.readlines() + if not lines: + return "0" * 64 + + last_line = lines[-1].strip() + last_action = json.loads(last_line) + return last_action["hash"] + + def sign_action( + self, + action_type: str, + payload: Dict[str, Any], + context: Optional[Dict[str, Any]] = None + ) -> Dict[str, Any]: + """ + Sign an action and append to chain. + + Args: + action_type: Type of action (e.g., "exec", "write_file", "api_call") + payload: Action data + context: Optional context (reasoning, session_id, etc.) + + Returns: + Signed action record + """ + timestamp = time.time() + + # Build action record + action = { + "agent": self.agent_name, + "timestamp": timestamp, + "iso_time": datetime.utcfromtimestamp(timestamp).isoformat() + "Z", + "type": action_type, + "payload": payload, + "context": context or {}, + "previous_hash": self.last_hash, + } + + # Compute hash + action_bytes = json.dumps(action, sort_keys=True).encode() + action_hash = hashlib.sha256(action_bytes).hexdigest() + action["hash"] = action_hash + + # Sign the hash + signature = self.private_key.sign(action_hash.encode()) + action["signature"] = base64.b64encode(signature).decode() + + # Append to chain + with open(self.chain_path, "a") as f: + f.write(json.dumps(action) + "\n") + + # Update last hash + self.last_hash = action_hash + + return action + + def get_public_key_pem(self) -> str: + """Get public key in PEM format for verification.""" + pem = self.public_key.public_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PublicFormat.SubjectPublicKeyInfo + ) + return pem.decode() + + def get_chain(self, limit: Optional[int] = None) -> list: + """Retrieve action chain.""" + if not self.chain_path.exists(): + return [] + + with open(self.chain_path, "r") as f: + lines = f.readlines() + + if limit: + lines = lines[-limit:] + + return [json.loads(line) for line in lines if line.strip()] + + def verify_chain_integrity(self) -> tuple[bool, Optional[str]]: + """ + Verify the entire chain is intact and unmodified. + + Returns: + (is_valid, error_message) + """ + chain = self.get_chain() + + if not chain: + return True, None + + expected_prev = "0" * 64 + + for i, action in enumerate(chain): + # Check previous hash linkage + if action["previous_hash"] != expected_prev: + return False, f"Chain broken at index {i}: hash mismatch" + + # Verify signature + try: + sig_bytes = base64.b64decode(action["signature"]) + self.public_key.verify(sig_bytes, action["hash"].encode()) + except Exception as e: + return False, f"Invalid signature at index {i}: {e}" + + expected_prev = action["hash"] + + return True, None + + +# Convenience function +def create_agent_chain(agent_name: str) -> AgentProvenanceChain: + """Create or load an agent provenance chain.""" + return AgentProvenanceChain(agent_name) diff --git a/demo.py b/demo.py new file mode 100644 index 0000000..7a867ed --- /dev/null +++ b/demo.py @@ -0,0 +1,147 @@ +#!/usr/bin/env python3 +""" +Live demonstration of Agent Provenance Chain. + +This script shows Molt (AI agent) signing its own actions in real-time. +Every operation is cryptographically signed and traceable. +""" + +from apc import create_agent_chain +import json +import subprocess +import os + +def main(): + print("=" * 70) + print("šŸ¦ž AGENT PROVENANCE CHAIN - LIVE DEMO") + print("=" * 70) + print() + print("Agent: Molt") + print("Mission: Demonstrate cryptographic audit trail for AI agents") + print() + print("-" * 70) + + # Initialize chain + chain = create_agent_chain("molt") + + print("\nāœ… Agent identity established") + print(f" Public Key (first 64 chars):") + print(f" {chain.get_public_key_pem()[:64]}...") + print() + + # Action 1: File write + print("šŸ“ ACTION 1: Writing a test file...") + test_file = "/tmp/apc_test.txt" + with open(test_file, "w") as f: + f.write("Hello from Agent Provenance Chain!") + + action1 = chain.sign_action( + action_type="file_write", + payload={ + "path": test_file, + "content": "Hello from Agent Provenance Chain!", + "bytes": 35 + }, + context={ + "reasoning": "Creating test file to demonstrate signed operations", + "session": "demo-2026-02-07" + } + ) + + print(f" āœ“ Signed at: {action1['iso_time']}") + print(f" āœ“ Hash: {action1['hash'][:32]}...") + print(f" āœ“ Signature: {action1['signature'][:32]}...") + print() + + # Action 2: Shell execution + print("āš™ļø ACTION 2: Executing shell command...") + result = subprocess.run(["whoami"], capture_output=True, text=True) + + action2 = chain.sign_action( + action_type="shell_exec", + payload={ + "command": "whoami", + "exit_code": result.returncode, + "stdout": result.stdout.strip(), + "stderr": result.stderr.strip() + }, + context={ + "reasoning": "Checking current user context for audit trail", + "risk_level": "low" + } + ) + + print(f" āœ“ Signed at: {action2['iso_time']}") + print(f" āœ“ Hash: {action2['hash'][:32]}...") + print(f" āœ“ Previous hash: {action2['previous_hash'][:32]}...") + print(f" āœ“ Chain link verified!") + print() + + # Action 3: API call simulation + print("🌐 ACTION 3: Simulated API call...") + action3 = chain.sign_action( + action_type="api_call", + payload={ + "endpoint": "https://api.example.com/data", + "method": "GET", + "status_code": 200, + "response_time_ms": 145 + }, + context={ + "reasoning": "Fetching external data for processing", + "data_sensitivity": "public" + } + ) + + print(f" āœ“ Signed at: {action3['iso_time']}") + print(f" āœ“ Hash: {action3['hash'][:32]}...") + print() + + # Verify chain integrity + print("šŸ” VERIFYING CHAIN INTEGRITY...") + is_valid, error = chain.verify_chain_integrity() + + if is_valid: + print(" āœ… Chain is VALID - all signatures verified!") + print(" āœ… All actions are cryptographically linked!") + print() + else: + print(f" āŒ Chain verification FAILED: {error}") + print() + + # Display full chain + print("-" * 70) + print("COMPLETE AUDIT TRAIL:") + print("-" * 70) + + full_chain = chain.get_chain() + for i, action in enumerate(full_chain, 1): + print(f"\nAction #{i}:") + print(f" Type: {action['type']}") + print(f" Time: {action['iso_time']}") + print(f" Hash: {action['hash'][:32]}...") + print(f" Payload: {json.dumps(action['payload'], indent=4)}") + if action['context']: + print(f" Context: {json.dumps(action['context'], indent=4)}") + + print() + print("=" * 70) + print("šŸ“Š SUMMARY") + print("=" * 70) + print(f"Total Actions: {len(full_chain)}") + print(f"Chain Valid: {is_valid}") + print(f"Agent: molt") + print(f"Chain Location: {chain.chain_path}") + print() + print("šŸ” Every action above is:") + print(" • Timestamped with microsecond precision") + print(" • Cryptographically signed (Ed25519)") + print(" • Linked to previous action (blockchain-style)") + print(" • Immutable and auditable") + print() + print("This is how AI agents prove safety without sacrificing speed.") + print("=" * 70) + + +if __name__ == "__main__": + main() diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..e30c06f --- /dev/null +++ b/requirements.txt @@ -0,0 +1 @@ +cryptography>=42.0.0