| 1 | #!/usr/bin/env python3
|
| 2 | """
|
| 3 | Agent Provenance Chain (APC)
|
| 4 | Cryptographic audit trail for autonomous AI agents.
|
| 5 |
|
| 6 | Every action signed. Every decision traceable. Full transparency.
|
| 7 | """
|
| 8 |
|
| 9 | import hashlib
|
| 10 | import json
|
| 11 | import time
|
| 12 | from datetime import datetime
|
| 13 | from pathlib import Path
|
| 14 | from typing import Any, Dict, Optional
|
| 15 | from cryptography.hazmat.primitives.asymmetric.ed25519 import (
|
| 16 | Ed25519PrivateKey,
|
| 17 | Ed25519PublicKey,
|
| 18 | )
|
| 19 | from cryptography.hazmat.primitives import serialization
|
| 20 | import base64
|
| 21 |
|
| 22 |
|
| 23 | class AgentProvenanceChain:
|
| 24 | """
|
| 25 | Cryptographic audit trail for agent actions.
|
| 26 |
|
| 27 | Every action is:
|
| 28 | - Timestamped
|
| 29 | - Signed with Ed25519
|
| 30 | - Linked to previous action (blockchain-style)
|
| 31 | - Stored immutably
|
| 32 | """
|
| 33 |
|
| 34 | def __init__(self, agent_name: str, key_path: Optional[Path] = None):
|
| 35 | self.agent_name = agent_name
|
| 36 | self.key_path = key_path or Path.home() / ".apc" / f"{agent_name}.key"
|
| 37 | self.chain_path = Path.home() / ".apc" / f"{agent_name}.chain.jsonl"
|
| 38 |
|
| 39 | self._ensure_directories()
|
| 40 | self.private_key = self._load_or_generate_key()
|
| 41 | self.public_key = self.private_key.public_key()
|
| 42 |
|
| 43 | self.last_hash = self._get_last_hash()
|
| 44 |
|
| 45 | def _ensure_directories(self):
|
| 46 | """Create necessary directories."""
|
| 47 | self.key_path.parent.mkdir(parents=True, exist_ok=True)
|
| 48 | self.chain_path.parent.mkdir(parents=True, exist_ok=True)
|
| 49 |
|
| 50 | def _load_or_generate_key(self) -> Ed25519PrivateKey:
|
| 51 | """Load existing key or generate new one."""
|
| 52 | if self.key_path.exists():
|
| 53 | with open(self.key_path, "rb") as f:
|
| 54 | return serialization.load_pem_private_key(f.read(), password=None)
|
| 55 |
|
| 56 | # Generate new key
|
| 57 | private_key = Ed25519PrivateKey.generate()
|
| 58 |
|
| 59 | # Save it
|
| 60 | pem = private_key.private_bytes(
|
| 61 | encoding=serialization.Encoding.PEM,
|
| 62 | format=serialization.PrivateFormat.PKCS8,
|
| 63 | encryption_algorithm=serialization.NoEncryption()
|
| 64 | )
|
| 65 |
|
| 66 | with open(self.key_path, "wb") as f:
|
| 67 | f.write(pem)
|
| 68 |
|
| 69 | self.key_path.chmod(0o600)
|
| 70 |
|
| 71 | return private_key
|
| 72 |
|
| 73 | def _get_last_hash(self) -> str:
|
| 74 | """Get hash of last action in chain."""
|
| 75 | if not self.chain_path.exists():
|
| 76 | return "0" * 64 # Genesis hash
|
| 77 |
|
| 78 | with open(self.chain_path, "r") as f:
|
| 79 | lines = f.readlines()
|
| 80 | if not lines:
|
| 81 | return "0" * 64
|
| 82 |
|
| 83 | last_line = lines[-1].strip()
|
| 84 | last_action = json.loads(last_line)
|
| 85 | return last_action["hash"]
|
| 86 |
|
| 87 | def sign_action(
|
| 88 | self,
|
| 89 | action_type: str,
|
| 90 | payload: Dict[str, Any],
|
| 91 | context: Optional[Dict[str, Any]] = None
|
| 92 | ) -> Dict[str, Any]:
|
| 93 | """
|
| 94 | Sign an action and append to chain.
|
| 95 |
|
| 96 | Args:
|
| 97 | action_type: Type of action (e.g., "exec", "write_file", "api_call")
|
| 98 | payload: Action data
|
| 99 | context: Optional context (reasoning, session_id, etc.)
|
| 100 |
|
| 101 | Returns:
|
| 102 | Signed action record
|
| 103 | """
|
| 104 | timestamp = time.time()
|
| 105 |
|
| 106 | # Build action record
|
| 107 | action = {
|
| 108 | "agent": self.agent_name,
|
| 109 | "timestamp": timestamp,
|
| 110 | "iso_time": datetime.utcfromtimestamp(timestamp).isoformat() + "Z",
|
| 111 | "type": action_type,
|
| 112 | "payload": payload,
|
| 113 | "context": context or {},
|
| 114 | "previous_hash": self.last_hash,
|
| 115 | }
|
| 116 |
|
| 117 | # Compute hash
|
| 118 | action_bytes = json.dumps(action, sort_keys=True).encode()
|
| 119 | action_hash = hashlib.sha256(action_bytes).hexdigest()
|
| 120 | action["hash"] = action_hash
|
| 121 |
|
| 122 | # Sign the hash
|
| 123 | signature = self.private_key.sign(action_hash.encode())
|
| 124 | action["signature"] = base64.b64encode(signature).decode()
|
| 125 |
|
| 126 | # Append to chain
|
| 127 | with open(self.chain_path, "a") as f:
|
| 128 | f.write(json.dumps(action) + "\n")
|
| 129 |
|
| 130 | # Update last hash
|
| 131 | self.last_hash = action_hash
|
| 132 |
|
| 133 | return action
|
| 134 |
|
| 135 | def get_public_key_pem(self) -> str:
|
| 136 | """Get public key in PEM format for verification."""
|
| 137 | pem = self.public_key.public_bytes(
|
| 138 | encoding=serialization.Encoding.PEM,
|
| 139 | format=serialization.PublicFormat.SubjectPublicKeyInfo
|
| 140 | )
|
| 141 | return pem.decode()
|
| 142 |
|
| 143 | def get_chain(self, limit: Optional[int] = None) -> list:
|
| 144 | """Retrieve action chain."""
|
| 145 | if not self.chain_path.exists():
|
| 146 | return []
|
| 147 |
|
| 148 | with open(self.chain_path, "r") as f:
|
| 149 | lines = f.readlines()
|
| 150 |
|
| 151 | if limit:
|
| 152 | lines = lines[-limit:]
|
| 153 |
|
| 154 | return [json.loads(line) for line in lines if line.strip()]
|
| 155 |
|
| 156 | def verify_chain_integrity(self) -> tuple[bool, Optional[str]]:
|
| 157 | """
|
| 158 | Verify the entire chain is intact and unmodified.
|
| 159 |
|
| 160 | Returns:
|
| 161 | (is_valid, error_message)
|
| 162 | """
|
| 163 | chain = self.get_chain()
|
| 164 |
|
| 165 | if not chain:
|
| 166 | return True, None
|
| 167 |
|
| 168 | expected_prev = "0" * 64
|
| 169 |
|
| 170 | for i, action in enumerate(chain):
|
| 171 | # Check previous hash linkage
|
| 172 | if action["previous_hash"] != expected_prev:
|
| 173 | return False, f"Chain broken at index {i}: hash mismatch"
|
| 174 |
|
| 175 | # Verify signature
|
| 176 | try:
|
| 177 | sig_bytes = base64.b64decode(action["signature"])
|
| 178 | self.public_key.verify(sig_bytes, action["hash"].encode())
|
| 179 | except Exception as e:
|
| 180 | return False, f"Invalid signature at index {i}: {e}"
|
| 181 |
|
| 182 | expected_prev = action["hash"]
|
| 183 |
|
| 184 | return True, None
|
| 185 |
|
| 186 |
|
| 187 | # Convenience function
|
| 188 | def create_agent_chain(agent_name: str) -> AgentProvenanceChain:
|
| 189 | """Create or load an agent provenance chain."""
|
| 190 | return AgentProvenanceChain(agent_name)
|
| 191 |
|