#!/usr/bin/env python3 """ Swarm Provenance Tracker A CLI for tracking agent contributions with cryptographic accountability. """ import argparse import json import hashlib import time from datetime import datetime from pathlib import Path from typing import Dict, List, Optional import sys class SwarmTracker: """Manages swarm workspace and provenance tracking.""" def __init__(self, workspace_path: str = ".swarm"): self.workspace = Path(workspace_path) self.config_file = self.workspace / "config.json" self.log_file = self.workspace / "provenance.jsonl" self.agents_file = self.workspace / "agents.json" def init(self, name: str): """Initialize a new swarm workspace.""" if self.workspace.exists(): print(f"āŒ Swarm workspace already exists at {self.workspace}") return False self.workspace.mkdir(parents=True) config = { "name": name, "created_at": datetime.utcnow().isoformat(), "version": "1.0.0" } with open(self.config_file, 'w') as f: json.dump(config, f, indent=2) # Initialize empty agents registry with open(self.agents_file, 'w') as f: json.dump({"agents": {}}, f, indent=2) # Create empty provenance log self.log_file.touch() print(f"āœ… Initialized swarm workspace: {name}") print(f"šŸ“ Location: {self.workspace.absolute()}") return True def add_agent(self, name: str, role: str, key: Optional[str] = None): """Register an agent in the swarm.""" if not self.workspace.exists(): print("āŒ No swarm workspace found. Run 'swarm.py init' first.") return False with open(self.agents_file, 'r') as f: data = json.load(f) if name in data["agents"]: print(f"āŒ Agent '{name}' already registered") return False agent_key = key or self._generate_agent_key(name) data["agents"][name] = { "role": role, "public_key": agent_key, "registered_at": datetime.utcnow().isoformat(), "contributions": 0 } with open(self.agents_file, 'w') as f: json.dump(data, f, indent=2) print(f"āœ… Registered agent: {name}") print(f"šŸ”‘ Key: {agent_key[:16]}...") return True def record_contribution(self, agent: str, file_path: str, message: str, sign: bool = True): """Record a contribution to the provenance log.""" if not self.workspace.exists(): print("āŒ No swarm workspace found.") return False # Load agents to verify with open(self.agents_file, 'r') as f: agents_data = json.load(f) if agent not in agents_data["agents"]: print(f"āŒ Agent '{agent}' not registered. Add with 'swarm.py agent add'") return False # Get previous hash for chaining prev_hash = self._get_last_hash() # Create contribution record contribution = { "agent": agent, "file": file_path, "message": message, "timestamp": datetime.utcnow().isoformat(), "prev_hash": prev_hash } # Calculate hash of this contribution contrib_hash = self._hash_contribution(contribution) contribution["hash"] = contrib_hash if sign: # Simple signature simulation (in production, use real crypto) agent_key = agents_data["agents"][agent]["public_key"] contribution["signature"] = self._sign(contrib_hash, agent_key) # Append to log with open(self.log_file, 'a') as f: f.write(json.dumps(contribution) + '\n') # Update agent contribution count agents_data["agents"][agent]["contributions"] += 1 with open(self.agents_file, 'w') as f: json.dump(agents_data, f, indent=2) print(f"āœ… Recorded contribution by {agent}") print(f"šŸ“ {message}") print(f"šŸ”— Hash: {contrib_hash[:16]}...") return True def show_history(self, limit: int = 20): """Display provenance history.""" if not self.log_file.exists(): print("āŒ No provenance log found.") return with open(self.log_file, 'r') as f: contributions = [json.loads(line) for line in f] if not contributions: print("šŸ“­ No contributions recorded yet.") return print(f"\nšŸ”— Provenance Chain ({len(contributions)} contributions)\n") for contrib in contributions[-limit:]: timestamp = contrib["timestamp"][:19].replace('T', ' ') print(f"ā”Œā”€ {contrib['agent']} @ {timestamp}") print(f"│ šŸ“„ {contrib['file']}") print(f"│ šŸ’¬ {contrib['message']}") print(f"│ šŸ”— {contrib['hash'][:16]}...") if 'signature' in contrib: print(f"│ āœļø Signed") print("└─") def export(self, format: str = "json"): """Export provenance data.""" if not self.log_file.exists(): print("āŒ No provenance log found.") return with open(self.log_file, 'r') as f: contributions = [json.loads(line) for line in f] if format == "json": print(json.dumps(contributions, indent=2)) else: print("āŒ Unsupported format. Use: json") # Helper methods def _generate_agent_key(self, agent_name: str) -> str: """Generate a simple key for an agent (demo purposes).""" return hashlib.sha256(f"{agent_name}-{time.time()}".encode()).hexdigest() def _get_last_hash(self) -> str: """Get the hash of the last contribution (for chaining).""" if not self.log_file.exists() or self.log_file.stat().st_size == 0: return "genesis" with open(self.log_file, 'r') as f: lines = f.readlines() if lines: last = json.loads(lines[-1]) return last.get("hash", "unknown") return "genesis" def _hash_contribution(self, contrib: Dict) -> str: """Calculate hash of a contribution.""" data = f"{contrib['agent']}{contrib['file']}{contrib['message']}{contrib['timestamp']}{contrib['prev_hash']}" return hashlib.sha256(data.encode()).hexdigest() def _sign(self, data: str, key: str) -> str: """Simple signature simulation (use real crypto in production).""" return hashlib.sha256(f"{data}{key}".encode()).hexdigest()[:32] def main(): parser = argparse.ArgumentParser( description="Swarm Provenance Tracker - Track agent contributions with cryptographic accountability" ) subparsers = parser.add_subparsers(dest='command', help='Commands') # Init command init_parser = subparsers.add_parser('init', help='Initialize swarm workspace') init_parser.add_argument('--name', required=True, help='Swarm name') # Agent commands agent_parser = subparsers.add_parser('agent', help='Manage agents') agent_subparsers = agent_parser.add_subparsers(dest='agent_command') add_agent_parser = agent_subparsers.add_parser('add', help='Add agent to swarm') add_agent_parser.add_argument('--name', required=True, help='Agent name') add_agent_parser.add_argument('--role', required=True, help='Agent role') add_agent_parser.add_argument('--key', help='Public key (auto-generated if not provided)') # Contribute command contrib_parser = subparsers.add_parser('contribute', help='Record a contribution') contrib_parser.add_argument('--agent', required=True, help='Agent name') contrib_parser.add_argument('--file', required=True, help='File path') contrib_parser.add_argument('--message', required=True, help='Contribution message') contrib_parser.add_argument('--sign', action='store_true', help='Sign the contribution') # History command history_parser = subparsers.add_parser('history', help='Show provenance history') history_parser.add_argument('--limit', type=int, default=20, help='Number of entries to show') # Export command export_parser = subparsers.add_parser('export', help='Export provenance data') export_parser.add_argument('--format', default='json', help='Export format (json)') args = parser.parse_args() if not args.command: parser.print_help() return tracker = SwarmTracker() if args.command == 'init': tracker.init(args.name) elif args.command == 'agent' and args.agent_command == 'add': tracker.add_agent(args.name, args.role, args.key) elif args.command == 'contribute': tracker.record_contribution(args.agent, args.file, args.message, args.sign) elif args.command == 'history': tracker.show_history(args.limit) elif args.command == 'export': tracker.export(args.format) if __name__ == "__main__": main()