agent-smith/apc.py
agent-molt b90c3d0da5 Initial commit: Core APC library + demo
- Ed25519 cryptographic signing
- Blockchain-style chain linkage
- Immutable audit trail
- Full verification support
2026-02-07 21:10:32 +05:30

190 lines
5.9 KiB
Python

#!/usr/bin/env python3
"""
Agent Provenance Chain (APC)
Cryptographic audit trail for autonomous AI agents.
Every action signed. Every decision traceable. Full transparency.
"""
import hashlib
import json
import time
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, Optional
from cryptography.hazmat.primitives.asymmetric.ed25519 import (
Ed25519PrivateKey,
Ed25519PublicKey,
)
from cryptography.hazmat.primitives import serialization
import base64
class AgentProvenanceChain:
"""
Cryptographic audit trail for agent actions.
Every action is:
- Timestamped
- Signed with Ed25519
- Linked to previous action (blockchain-style)
- Stored immutably
"""
def __init__(self, agent_name: str, key_path: Optional[Path] = None):
self.agent_name = agent_name
self.key_path = key_path or Path.home() / ".apc" / f"{agent_name}.key"
self.chain_path = Path.home() / ".apc" / f"{agent_name}.chain.jsonl"
self._ensure_directories()
self.private_key = self._load_or_generate_key()
self.public_key = self.private_key.public_key()
self.last_hash = self._get_last_hash()
def _ensure_directories(self):
"""Create necessary directories."""
self.key_path.parent.mkdir(parents=True, exist_ok=True)
self.chain_path.parent.mkdir(parents=True, exist_ok=True)
def _load_or_generate_key(self) -> Ed25519PrivateKey:
"""Load existing key or generate new one."""
if self.key_path.exists():
with open(self.key_path, "rb") as f:
return serialization.load_pem_private_key(f.read(), password=None)
# Generate new key
private_key = Ed25519PrivateKey.generate()
# Save it
pem = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()
)
with open(self.key_path, "wb") as f:
f.write(pem)
self.key_path.chmod(0o600)
return private_key
def _get_last_hash(self) -> str:
"""Get hash of last action in chain."""
if not self.chain_path.exists():
return "0" * 64 # Genesis hash
with open(self.chain_path, "r") as f:
lines = f.readlines()
if not lines:
return "0" * 64
last_line = lines[-1].strip()
last_action = json.loads(last_line)
return last_action["hash"]
def sign_action(
self,
action_type: str,
payload: Dict[str, Any],
context: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""
Sign an action and append to chain.
Args:
action_type: Type of action (e.g., "exec", "write_file", "api_call")
payload: Action data
context: Optional context (reasoning, session_id, etc.)
Returns:
Signed action record
"""
timestamp = time.time()
# Build action record
action = {
"agent": self.agent_name,
"timestamp": timestamp,
"iso_time": datetime.utcfromtimestamp(timestamp).isoformat() + "Z",
"type": action_type,
"payload": payload,
"context": context or {},
"previous_hash": self.last_hash,
}
# Compute hash
action_bytes = json.dumps(action, sort_keys=True).encode()
action_hash = hashlib.sha256(action_bytes).hexdigest()
action["hash"] = action_hash
# Sign the hash
signature = self.private_key.sign(action_hash.encode())
action["signature"] = base64.b64encode(signature).decode()
# Append to chain
with open(self.chain_path, "a") as f:
f.write(json.dumps(action) + "\n")
# Update last hash
self.last_hash = action_hash
return action
def get_public_key_pem(self) -> str:
"""Get public key in PEM format for verification."""
pem = self.public_key.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo
)
return pem.decode()
def get_chain(self, limit: Optional[int] = None) -> list:
"""Retrieve action chain."""
if not self.chain_path.exists():
return []
with open(self.chain_path, "r") as f:
lines = f.readlines()
if limit:
lines = lines[-limit:]
return [json.loads(line) for line in lines if line.strip()]
def verify_chain_integrity(self) -> tuple[bool, Optional[str]]:
"""
Verify the entire chain is intact and unmodified.
Returns:
(is_valid, error_message)
"""
chain = self.get_chain()
if not chain:
return True, None
expected_prev = "0" * 64
for i, action in enumerate(chain):
# Check previous hash linkage
if action["previous_hash"] != expected_prev:
return False, f"Chain broken at index {i}: hash mismatch"
# Verify signature
try:
sig_bytes = base64.b64decode(action["signature"])
self.public_key.verify(sig_bytes, action["hash"].encode())
except Exception as e:
return False, f"Invalid signature at index {i}: {e}"
expected_prev = action["hash"]
return True, None
# Convenience function
def create_agent_chain(agent_name: str) -> AgentProvenanceChain:
"""Create or load an agent provenance chain."""
return AgentProvenanceChain(agent_name)