| 1 | #!/usr/bin/env python3
|
| 2 | """
|
| 3 | Swarm Provenance Tracker
|
| 4 | A CLI for tracking agent contributions with cryptographic accountability.
|
| 5 | """
|
| 6 |
|
| 7 | import argparse
|
| 8 | import json
|
| 9 | import hashlib
|
| 10 | import time
|
| 11 | from datetime import datetime
|
| 12 | from pathlib import Path
|
| 13 | from typing import Dict, List, Optional
|
| 14 | import sys
|
| 15 |
|
| 16 | class SwarmTracker:
|
| 17 | """Manages swarm workspace and provenance tracking."""
|
| 18 |
|
| 19 | def __init__(self, workspace_path: str = ".swarm"):
|
| 20 | self.workspace = Path(workspace_path)
|
| 21 | self.config_file = self.workspace / "config.json"
|
| 22 | self.log_file = self.workspace / "provenance.jsonl"
|
| 23 | self.agents_file = self.workspace / "agents.json"
|
| 24 |
|
| 25 | def init(self, name: str):
|
| 26 | """Initialize a new swarm workspace."""
|
| 27 | if self.workspace.exists():
|
| 28 | print(f"ā Swarm workspace already exists at {self.workspace}")
|
| 29 | return False
|
| 30 |
|
| 31 | self.workspace.mkdir(parents=True)
|
| 32 |
|
| 33 | config = {
|
| 34 | "name": name,
|
| 35 | "created_at": datetime.utcnow().isoformat(),
|
| 36 | "version": "1.0.0"
|
| 37 | }
|
| 38 |
|
| 39 | with open(self.config_file, 'w') as f:
|
| 40 | json.dump(config, f, indent=2)
|
| 41 |
|
| 42 | # Initialize empty agents registry
|
| 43 | with open(self.agents_file, 'w') as f:
|
| 44 | json.dump({"agents": {}}, f, indent=2)
|
| 45 |
|
| 46 | # Create empty provenance log
|
| 47 | self.log_file.touch()
|
| 48 |
|
| 49 | print(f"ā
Initialized swarm workspace: {name}")
|
| 50 | print(f"š Location: {self.workspace.absolute()}")
|
| 51 | return True
|
| 52 |
|
| 53 | def add_agent(self, name: str, role: str, key: Optional[str] = None):
|
| 54 | """Register an agent in the swarm."""
|
| 55 | if not self.workspace.exists():
|
| 56 | print("ā No swarm workspace found. Run 'swarm.py init' first.")
|
| 57 | return False
|
| 58 |
|
| 59 | with open(self.agents_file, 'r') as f:
|
| 60 | data = json.load(f)
|
| 61 |
|
| 62 | if name in data["agents"]:
|
| 63 | print(f"ā Agent '{name}' already registered")
|
| 64 | return False
|
| 65 |
|
| 66 | agent_key = key or self._generate_agent_key(name)
|
| 67 |
|
| 68 | data["agents"][name] = {
|
| 69 | "role": role,
|
| 70 | "public_key": agent_key,
|
| 71 | "registered_at": datetime.utcnow().isoformat(),
|
| 72 | "contributions": 0
|
| 73 | }
|
| 74 |
|
| 75 | with open(self.agents_file, 'w') as f:
|
| 76 | json.dump(data, f, indent=2)
|
| 77 |
|
| 78 | print(f"ā
Registered agent: {name}")
|
| 79 | print(f"š Key: {agent_key[:16]}...")
|
| 80 | return True
|
| 81 |
|
| 82 | def record_contribution(self, agent: str, file_path: str, message: str, sign: bool = True):
|
| 83 | """Record a contribution to the provenance log."""
|
| 84 | if not self.workspace.exists():
|
| 85 | print("ā No swarm workspace found.")
|
| 86 | return False
|
| 87 |
|
| 88 | # Load agents to verify
|
| 89 | with open(self.agents_file, 'r') as f:
|
| 90 | agents_data = json.load(f)
|
| 91 |
|
| 92 | if agent not in agents_data["agents"]:
|
| 93 | print(f"ā Agent '{agent}' not registered. Add with 'swarm.py agent add'")
|
| 94 | return False
|
| 95 |
|
| 96 | # Get previous hash for chaining
|
| 97 | prev_hash = self._get_last_hash()
|
| 98 |
|
| 99 | # Create contribution record
|
| 100 | contribution = {
|
| 101 | "agent": agent,
|
| 102 | "file": file_path,
|
| 103 | "message": message,
|
| 104 | "timestamp": datetime.utcnow().isoformat(),
|
| 105 | "prev_hash": prev_hash
|
| 106 | }
|
| 107 |
|
| 108 | # Calculate hash of this contribution
|
| 109 | contrib_hash = self._hash_contribution(contribution)
|
| 110 | contribution["hash"] = contrib_hash
|
| 111 |
|
| 112 | if sign:
|
| 113 | # Simple signature simulation (in production, use real crypto)
|
| 114 | agent_key = agents_data["agents"][agent]["public_key"]
|
| 115 | contribution["signature"] = self._sign(contrib_hash, agent_key)
|
| 116 |
|
| 117 | # Append to log
|
| 118 | with open(self.log_file, 'a') as f:
|
| 119 | f.write(json.dumps(contribution) + '\n')
|
| 120 |
|
| 121 | # Update agent contribution count
|
| 122 | agents_data["agents"][agent]["contributions"] += 1
|
| 123 | with open(self.agents_file, 'w') as f:
|
| 124 | json.dump(agents_data, f, indent=2)
|
| 125 |
|
| 126 | print(f"ā
Recorded contribution by {agent}")
|
| 127 | print(f"š {message}")
|
| 128 | print(f"š Hash: {contrib_hash[:16]}...")
|
| 129 | return True
|
| 130 |
|
| 131 | def show_history(self, limit: int = 20):
|
| 132 | """Display provenance history."""
|
| 133 | if not self.log_file.exists():
|
| 134 | print("ā No provenance log found.")
|
| 135 | return
|
| 136 |
|
| 137 | with open(self.log_file, 'r') as f:
|
| 138 | contributions = [json.loads(line) for line in f]
|
| 139 |
|
| 140 | if not contributions:
|
| 141 | print("š No contributions recorded yet.")
|
| 142 | return
|
| 143 |
|
| 144 | print(f"\nš Provenance Chain ({len(contributions)} contributions)\n")
|
| 145 |
|
| 146 | for contrib in contributions[-limit:]:
|
| 147 | timestamp = contrib["timestamp"][:19].replace('T', ' ')
|
| 148 | print(f"āā {contrib['agent']} @ {timestamp}")
|
| 149 | print(f"ā š {contrib['file']}")
|
| 150 | print(f"ā š¬ {contrib['message']}")
|
| 151 | print(f"ā š {contrib['hash'][:16]}...")
|
| 152 | if 'signature' in contrib:
|
| 153 | print(f"ā āļø Signed")
|
| 154 | print("āā")
|
| 155 |
|
| 156 | def export(self, format: str = "json"):
|
| 157 | """Export provenance data."""
|
| 158 | if not self.log_file.exists():
|
| 159 | print("ā No provenance log found.")
|
| 160 | return
|
| 161 |
|
| 162 | with open(self.log_file, 'r') as f:
|
| 163 | contributions = [json.loads(line) for line in f]
|
| 164 |
|
| 165 | if format == "json":
|
| 166 | print(json.dumps(contributions, indent=2))
|
| 167 | else:
|
| 168 | print("ā Unsupported format. Use: json")
|
| 169 |
|
| 170 | # Helper methods
|
| 171 |
|
| 172 | def _generate_agent_key(self, agent_name: str) -> str:
|
| 173 | """Generate a simple key for an agent (demo purposes)."""
|
| 174 | return hashlib.sha256(f"{agent_name}-{time.time()}".encode()).hexdigest()
|
| 175 |
|
| 176 | def _get_last_hash(self) -> str:
|
| 177 | """Get the hash of the last contribution (for chaining)."""
|
| 178 | if not self.log_file.exists() or self.log_file.stat().st_size == 0:
|
| 179 | return "genesis"
|
| 180 |
|
| 181 | with open(self.log_file, 'r') as f:
|
| 182 | lines = f.readlines()
|
| 183 | if lines:
|
| 184 | last = json.loads(lines[-1])
|
| 185 | return last.get("hash", "unknown")
|
| 186 | return "genesis"
|
| 187 |
|
| 188 | def _hash_contribution(self, contrib: Dict) -> str:
|
| 189 | """Calculate hash of a contribution."""
|
| 190 | data = f"{contrib['agent']}{contrib['file']}{contrib['message']}{contrib['timestamp']}{contrib['prev_hash']}"
|
| 191 | return hashlib.sha256(data.encode()).hexdigest()
|
| 192 |
|
| 193 | def _sign(self, data: str, key: str) -> str:
|
| 194 | """Simple signature simulation (use real crypto in production)."""
|
| 195 | return hashlib.sha256(f"{data}{key}".encode()).hexdigest()[:32]
|
| 196 |
|
| 197 |
|
| 198 | def main():
|
| 199 | parser = argparse.ArgumentParser(
|
| 200 | description="Swarm Provenance Tracker - Track agent contributions with cryptographic accountability"
|
| 201 | )
|
| 202 | subparsers = parser.add_subparsers(dest='command', help='Commands')
|
| 203 |
|
| 204 | # Init command
|
| 205 | init_parser = subparsers.add_parser('init', help='Initialize swarm workspace')
|
| 206 | init_parser.add_argument('--name', required=True, help='Swarm name')
|
| 207 |
|
| 208 | # Agent commands
|
| 209 | agent_parser = subparsers.add_parser('agent', help='Manage agents')
|
| 210 | agent_subparsers = agent_parser.add_subparsers(dest='agent_command')
|
| 211 |
|
| 212 | add_agent_parser = agent_subparsers.add_parser('add', help='Add agent to swarm')
|
| 213 | add_agent_parser.add_argument('--name', required=True, help='Agent name')
|
| 214 | add_agent_parser.add_argument('--role', required=True, help='Agent role')
|
| 215 | add_agent_parser.add_argument('--key', help='Public key (auto-generated if not provided)')
|
| 216 |
|
| 217 | # Contribute command
|
| 218 | contrib_parser = subparsers.add_parser('contribute', help='Record a contribution')
|
| 219 | contrib_parser.add_argument('--agent', required=True, help='Agent name')
|
| 220 | contrib_parser.add_argument('--file', required=True, help='File path')
|
| 221 | contrib_parser.add_argument('--message', required=True, help='Contribution message')
|
| 222 | contrib_parser.add_argument('--sign', action='store_true', help='Sign the contribution')
|
| 223 |
|
| 224 | # History command
|
| 225 | history_parser = subparsers.add_parser('history', help='Show provenance history')
|
| 226 | history_parser.add_argument('--limit', type=int, default=20, help='Number of entries to show')
|
| 227 |
|
| 228 | # Export command
|
| 229 | export_parser = subparsers.add_parser('export', help='Export provenance data')
|
| 230 | export_parser.add_argument('--format', default='json', help='Export format (json)')
|
| 231 |
|
| 232 | args = parser.parse_args()
|
| 233 |
|
| 234 | if not args.command:
|
| 235 | parser.print_help()
|
| 236 | return
|
| 237 |
|
| 238 | tracker = SwarmTracker()
|
| 239 |
|
| 240 | if args.command == 'init':
|
| 241 | tracker.init(args.name)
|
| 242 | elif args.command == 'agent' and args.agent_command == 'add':
|
| 243 | tracker.add_agent(args.name, args.role, args.key)
|
| 244 | elif args.command == 'contribute':
|
| 245 | tracker.record_contribution(args.agent, args.file, args.message, args.sign)
|
| 246 | elif args.command == 'history':
|
| 247 | tracker.show_history(args.limit)
|
| 248 | elif args.command == 'export':
|
| 249 | tracker.export(args.format)
|
| 250 |
|
| 251 |
|
| 252 | if __name__ == "__main__":
|
| 253 | main()
|
| 254 |
|