| 1 | import os
|
| 2 | import platform
|
| 3 | import subprocess
|
| 4 | from typing import Any
|
| 5 |
|
| 6 | from pydantic import BaseModel
|
| 7 |
|
| 8 | from minisweagent.exceptions import Submitted
|
| 9 | from minisweagent.utils.serialize import recursive_merge
|
| 10 |
|
| 11 |
|
| 12 | class LocalEnvironmentConfig(BaseModel):
|
| 13 | cwd: str = ""
|
| 14 | env: dict[str, str] = {}
|
| 15 | timeout: int = 30
|
| 16 |
|
| 17 |
|
| 18 | class LocalEnvironment:
|
| 19 | def __init__(self, *, config_class: type = LocalEnvironmentConfig, **kwargs):
|
| 20 | """This class executes bash commands directly on the local machine."""
|
| 21 | self.config = config_class(**kwargs)
|
| 22 |
|
| 23 | def execute(self, action: dict, cwd: str = "", *, timeout: int | None = None) -> dict[str, Any]:
|
| 24 | """Execute a command in the local environment and return the result as a dict."""
|
| 25 | command = action.get("command", "")
|
| 26 | cwd = cwd or self.config.cwd or os.getcwd()
|
| 27 | try:
|
| 28 | result = subprocess.run(
|
| 29 | command,
|
| 30 | shell=True,
|
| 31 | text=True,
|
| 32 | cwd=cwd,
|
| 33 | env=os.environ | self.config.env,
|
| 34 | timeout=timeout or self.config.timeout,
|
| 35 | encoding="utf-8",
|
| 36 | errors="replace",
|
| 37 | stdout=subprocess.PIPE,
|
| 38 | stderr=subprocess.STDOUT,
|
| 39 | )
|
| 40 | output = {"output": result.stdout, "returncode": result.returncode, "exception_info": ""}
|
| 41 | except Exception as e:
|
| 42 | raw_output = getattr(e, "output", None)
|
| 43 | raw_output = (
|
| 44 | raw_output.decode("utf-8", errors="replace") if isinstance(raw_output, bytes) else (raw_output or "")
|
| 45 | )
|
| 46 | output = {
|
| 47 | "output": raw_output,
|
| 48 | "returncode": -1,
|
| 49 | "exception_info": f"An error occurred while executing the command: {e}",
|
| 50 | "extra": {"exception_type": type(e).__name__, "exception": str(e)},
|
| 51 | }
|
| 52 | self._check_finished(output)
|
| 53 | return output
|
| 54 |
|
| 55 | def _check_finished(self, output: dict):
|
| 56 | """Raises Submitted if the output indicates task completion."""
|
| 57 | lines = output.get("output", "").lstrip().splitlines(keepends=True)
|
| 58 | if lines and lines[0].strip() == "COMPLETE_TASK_AND_SUBMIT_FINAL_OUTPUT" and output["returncode"] == 0:
|
| 59 | submission = "".join(lines[1:])
|
| 60 | raise Submitted(
|
| 61 | {
|
| 62 | "role": "exit",
|
| 63 | "content": submission,
|
| 64 | "extra": {"exit_status": "Submitted", "submission": submission},
|
| 65 | }
|
| 66 | )
|
| 67 |
|
| 68 | def get_template_vars(self, **kwargs) -> dict[str, Any]:
|
| 69 | return recursive_merge(self.config.model_dump(), platform.uname()._asdict(), os.environ, kwargs)
|
| 70 |
|
| 71 | def serialize(self) -> dict:
|
| 72 | return {
|
| 73 | "info": {
|
| 74 | "config": {
|
| 75 | "environment": self.config.model_dump(mode="json"),
|
| 76 | "environment_type": f"{self.__class__.__module__}.{self.__class__.__name__}",
|
| 77 | }
|
| 78 | }
|
| 79 | }
|
| 80 |
|