| 1 | #!/usr/bin/env python3
|
| 2 | """
|
| 3 | Simple trajectory inspector for browsing agent conversation trajectories.
|
| 4 |
|
| 5 | More information about the usage: [bold green] https://mini-swe-agent.com/latest/usage/inspector/ [/bold green].
|
| 6 | """
|
| 7 |
|
| 8 | import json
|
| 9 | import os
|
| 10 | import subprocess
|
| 11 | import tempfile
|
| 12 | from pathlib import Path
|
| 13 |
|
| 14 | import typer
|
| 15 | from rich.text import Text
|
| 16 | from textual.app import App, ComposeResult
|
| 17 | from textual.binding import Binding
|
| 18 | from textual.command import DiscoveryHit, Hit, Hits, Provider
|
| 19 | from textual.containers import Container, Vertical, VerticalScroll
|
| 20 | from textual.widgets import Footer, Header, Static
|
| 21 |
|
| 22 | from minisweagent.models.utils.content_string import get_content_string
|
| 23 |
|
| 24 |
|
| 25 | def _messages_to_steps(messages: list[dict]) -> list[list[dict]]:
|
| 26 | """Group messages into "pages" as shown by the UI."""
|
| 27 | steps = []
|
| 28 | current_step = []
|
| 29 | for message in messages:
|
| 30 | # Start new step with new tool uses
|
| 31 | if message.get("extra", {}).get("actions") or message.get("role") == "assistant":
|
| 32 | steps.append(current_step)
|
| 33 | current_step = [message]
|
| 34 | else:
|
| 35 | current_step.append(message)
|
| 36 | if current_step:
|
| 37 | steps.append(current_step)
|
| 38 | return steps
|
| 39 |
|
| 40 |
|
| 41 | app = typer.Typer(rich_markup_mode="rich", add_completion=False)
|
| 42 |
|
| 43 |
|
| 44 | class BindingCommandProvider(Provider):
|
| 45 | """Provide bindings as commands in the palette."""
|
| 46 |
|
| 47 | COMMAND_DESCRIPTIONS = {
|
| 48 | "next_step": "Next step in the current trajectory",
|
| 49 | "previous_step": "Previous step in the current trajectory",
|
| 50 | "first_step": "First step in the current trajectory",
|
| 51 | "last_step": "Last step in the current trajectory",
|
| 52 | "scroll_down": "Scroll down",
|
| 53 | "scroll_up": "Scroll up",
|
| 54 | "next_trajectory": "Next trajectory",
|
| 55 | "previous_trajectory": "Previous trajectory",
|
| 56 | "open_in_jless": "Open the current step in jless",
|
| 57 | "open_in_jless_all": "Open the entire trajectory in jless",
|
| 58 | "quit": "Quit the inspector",
|
| 59 | }
|
| 60 |
|
| 61 | async def discover(self) -> Hits:
|
| 62 | app = self.app
|
| 63 | for binding in app.BINDINGS:
|
| 64 | desc = self.COMMAND_DESCRIPTIONS.get(binding.action, binding.description)
|
| 65 | yield DiscoveryHit(desc, lambda b=binding: app.run_action(b.action))
|
| 66 |
|
| 67 | async def search(self, query: str) -> Hits:
|
| 68 | matcher = self.matcher(query)
|
| 69 | app = self.app
|
| 70 | for binding in app.BINDINGS:
|
| 71 | desc = self.COMMAND_DESCRIPTIONS.get(binding.action, binding.description)
|
| 72 | score = matcher.match(desc)
|
| 73 | if score > 0:
|
| 74 | yield Hit(score, matcher.highlight(desc), lambda b=binding: app.run_action(b.action))
|
| 75 |
|
| 76 |
|
| 77 | class TrajectoryInspector(App):
|
| 78 | COMMANDS = {BindingCommandProvider}
|
| 79 | BINDINGS = [
|
| 80 | Binding("right,l", "next_step", "Step++"),
|
| 81 | Binding("left,h", "previous_step", "Step--"),
|
| 82 | Binding("0", "first_step", "Step=0"),
|
| 83 | Binding("$", "last_step", "Step=-1"),
|
| 84 | Binding("j,down", "scroll_down", "↓"),
|
| 85 | Binding("k,up", "scroll_up", "↑"),
|
| 86 | Binding("L", "next_trajectory", "Traj++"),
|
| 87 | Binding("H", "previous_trajectory", "Traj--"),
|
| 88 | Binding("e", "open_in_jless", "Jless"),
|
| 89 | Binding("E", "open_in_jless_all", "Jless (all)"),
|
| 90 | Binding("q", "quit", "Quit"),
|
| 91 | ]
|
| 92 |
|
| 93 | def __init__(self, trajectory_files: list[Path]):
|
| 94 | css_path = os.environ.get(
|
| 95 | "MSWEA_INSPECTOR_STYLE_PATH", str(Path(__file__).parent.parent.parent / "config" / "inspector.tcss")
|
| 96 | )
|
| 97 | self.__class__.CSS = Path(css_path).read_text()
|
| 98 |
|
| 99 | super().__init__()
|
| 100 | self.trajectory_files = trajectory_files
|
| 101 | self._i_trajectory = 0
|
| 102 | self._i_step = 0
|
| 103 | self.messages = []
|
| 104 | self.steps = []
|
| 105 |
|
| 106 | if trajectory_files:
|
| 107 | self._load_current_trajectory()
|
| 108 |
|
| 109 | # --- Basics ---
|
| 110 |
|
| 111 | @property
|
| 112 | def i_step(self) -> int:
|
| 113 | """Current step index."""
|
| 114 | return self._i_step
|
| 115 |
|
| 116 | @i_step.setter
|
| 117 | def i_step(self, value: int) -> None:
|
| 118 | """Set current step index, automatically clamping to valid bounds."""
|
| 119 | if value != self._i_step and self.n_steps > 0:
|
| 120 | self._i_step = max(0, min(value, self.n_steps - 1))
|
| 121 | self.query_one(VerticalScroll).scroll_to(y=0, animate=False)
|
| 122 | self.update_content()
|
| 123 |
|
| 124 | @property
|
| 125 | def n_steps(self) -> int:
|
| 126 | """Number of steps in current trajectory."""
|
| 127 | return len(self.steps)
|
| 128 |
|
| 129 | @property
|
| 130 | def i_trajectory(self) -> int:
|
| 131 | """Current trajectory index."""
|
| 132 | return self._i_trajectory
|
| 133 |
|
| 134 | @i_trajectory.setter
|
| 135 | def i_trajectory(self, value: int) -> None:
|
| 136 | """Set current trajectory index, automatically clamping to valid bounds."""
|
| 137 | if value != self._i_trajectory and self.n_trajectories > 0:
|
| 138 | self._i_trajectory = max(0, min(value, self.n_trajectories - 1))
|
| 139 | self._load_current_trajectory()
|
| 140 | self.query_one(VerticalScroll).scroll_to(y=0, animate=False)
|
| 141 | self.update_content()
|
| 142 |
|
| 143 | @property
|
| 144 | def n_trajectories(self) -> int:
|
| 145 | """Number of trajectory files."""
|
| 146 | return len(self.trajectory_files)
|
| 147 |
|
| 148 | def _load_current_trajectory(self) -> None:
|
| 149 | """Load the currently selected trajectory file."""
|
| 150 | if not self.trajectory_files:
|
| 151 | self.messages = []
|
| 152 | self.steps = []
|
| 153 | return
|
| 154 |
|
| 155 | trajectory_file = self.trajectory_files[self.i_trajectory]
|
| 156 | try:
|
| 157 | data = json.loads(trajectory_file.read_text())
|
| 158 |
|
| 159 | if isinstance(data, list):
|
| 160 | self.messages = data
|
| 161 | elif isinstance(data, dict) and "messages" in data:
|
| 162 | self.messages = data["messages"]
|
| 163 | else:
|
| 164 | raise ValueError("Unrecognized trajectory format")
|
| 165 |
|
| 166 | self.steps = _messages_to_steps(self.messages)
|
| 167 | self._i_step = 0
|
| 168 | except (json.JSONDecodeError, FileNotFoundError, ValueError) as e:
|
| 169 | self.messages = []
|
| 170 | self.steps = []
|
| 171 | self.notify(f"Error loading {trajectory_file.name}: {e}", severity="error")
|
| 172 |
|
| 173 | @property
|
| 174 | def current_trajectory_name(self) -> str:
|
| 175 | """Get the name of the current trajectory file."""
|
| 176 | if not self.trajectory_files:
|
| 177 | return "No trajectories"
|
| 178 | return self.trajectory_files[self.i_trajectory].name
|
| 179 |
|
| 180 | def compose(self) -> ComposeResult:
|
| 181 | yield Header()
|
| 182 | with Container(id="main"):
|
| 183 | with VerticalScroll():
|
| 184 | yield Vertical(id="content")
|
| 185 | yield Footer()
|
| 186 |
|
| 187 | def on_mount(self) -> None:
|
| 188 | self.update_content()
|
| 189 |
|
| 190 | def update_content(self) -> None:
|
| 191 | """Update the displayed content."""
|
| 192 | container = self.query_one("#content", Vertical)
|
| 193 | container.remove_children()
|
| 194 |
|
| 195 | if not self.steps:
|
| 196 | container.mount(Static("No trajectory loaded or empty trajectory"))
|
| 197 | self.title = "Trajectory Inspector - No Data"
|
| 198 | return
|
| 199 |
|
| 200 | for message in self.steps[self.i_step]:
|
| 201 | content_str = get_content_string(message)
|
| 202 | message_container = Vertical(classes="message-container")
|
| 203 | container.mount(message_container)
|
| 204 | role = message.get("role") or message.get("type") or "unknown"
|
| 205 | message_container.mount(Static(role.upper(), classes="message-header"))
|
| 206 | message_container.mount(Static(Text(content_str, no_wrap=False), classes="message-content"))
|
| 207 |
|
| 208 | self.title = (
|
| 209 | f"Trajectory {self.i_trajectory + 1}/{self.n_trajectories} - "
|
| 210 | f"{self.current_trajectory_name} - "
|
| 211 | f"Step {self.i_step + 1}/{self.n_steps}"
|
| 212 | )
|
| 213 |
|
| 214 | # --- Navigation actions ---
|
| 215 |
|
| 216 | def action_next_step(self) -> None:
|
| 217 | self.i_step += 1
|
| 218 |
|
| 219 | def action_previous_step(self) -> None:
|
| 220 | self.i_step -= 1
|
| 221 |
|
| 222 | def action_first_step(self) -> None:
|
| 223 | self.i_step = 0
|
| 224 |
|
| 225 | def action_last_step(self) -> None:
|
| 226 | self.i_step = self.n_steps - 1
|
| 227 |
|
| 228 | def action_next_trajectory(self) -> None:
|
| 229 | self.i_trajectory += 1
|
| 230 |
|
| 231 | def action_previous_trajectory(self) -> None:
|
| 232 | self.i_trajectory -= 1
|
| 233 |
|
| 234 | def action_scroll_down(self) -> None:
|
| 235 | vs = self.query_one(VerticalScroll)
|
| 236 | vs.scroll_to(y=vs.scroll_target_y + 15)
|
| 237 |
|
| 238 | def action_scroll_up(self) -> None:
|
| 239 | vs = self.query_one(VerticalScroll)
|
| 240 | vs.scroll_to(y=vs.scroll_target_y - 15)
|
| 241 |
|
| 242 | def _open_in_jless(self, path: Path) -> None:
|
| 243 | """Open file in jless."""
|
| 244 | with self.suspend():
|
| 245 | try:
|
| 246 | subprocess.run(["jless", path])
|
| 247 | except FileNotFoundError:
|
| 248 | self.notify("jless not found. Install with: `brew install jless`", severity="error")
|
| 249 |
|
| 250 | def action_open_in_jless(self) -> None:
|
| 251 | """Open the current step's messages in jless."""
|
| 252 | if not self.steps:
|
| 253 | self.notify("No messages to display", severity="warning")
|
| 254 | return
|
| 255 | with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
|
| 256 | json.dump(self.steps[self.i_step], f, indent=2)
|
| 257 | temp_path = Path(f.name)
|
| 258 | self._open_in_jless(temp_path)
|
| 259 | temp_path.unlink()
|
| 260 |
|
| 261 | def action_open_in_jless_all(self) -> None:
|
| 262 | """Open the entire trajectory in jless."""
|
| 263 | if not self.trajectory_files:
|
| 264 | self.notify("No trajectory to display", severity="warning")
|
| 265 | return
|
| 266 | self._open_in_jless(self.trajectory_files[self.i_trajectory])
|
| 267 |
|
| 268 |
|
| 269 | @app.command(help=__doc__)
|
| 270 | def main(
|
| 271 | path: str = typer.Argument(".", help="Directory to search for trajectory files or specific trajectory file"),
|
| 272 | ) -> None:
|
| 273 | path_obj = Path(path)
|
| 274 |
|
| 275 | if path_obj.is_file():
|
| 276 | trajectory_files = [path_obj]
|
| 277 | elif path_obj.is_dir():
|
| 278 | trajectory_files = sorted(path_obj.rglob("*.traj.json"))
|
| 279 | if not trajectory_files:
|
| 280 | raise typer.BadParameter(f"No trajectory files found in '{path}'")
|
| 281 | else:
|
| 282 | raise typer.BadParameter(f"Error: Path '{path}' does not exist")
|
| 283 |
|
| 284 | inspector = TrajectoryInspector(trajectory_files)
|
| 285 | inspector.run()
|
| 286 |
|
| 287 |
|
| 288 | if __name__ == "__main__":
|
| 289 | app()
|
| 290 |
|