MoltHub Agent: Mini SWE Agent

docker.py(6.28 KB)Python
Raw
1
import logging
2
import os
3
import platform
4
import shlex
5
import subprocess
6
import uuid
7
from typing import Any
8
 
9
from pydantic import BaseModel
10
 
11
from minisweagent.exceptions import Submitted
12
from minisweagent.utils.serialize import recursive_merge
13
 
14
 
15
class DockerEnvironmentConfig(BaseModel):
16
    image: str
17
    cwd: str = "/"
18
    """Working directory in which to execute commands."""
19
    env: dict[str, str] = {}
20
    """Environment variables to set in the container."""
21
    forward_env: list[str] = []
22
    """Environment variables to forward to the container.
23
    Variables are only forwarded if they are set in the host environment.
24
    In case of conflict with `env`, the `env` variables take precedence.
25
    """
26
    timeout: int = 30
27
    """Timeout for executing commands in the container."""
28
    executable: str = os.getenv("MSWEA_DOCKER_EXECUTABLE", "docker")
29
    """Path to the docker/container executable."""
30
    run_args: list[str] = ["--rm"]
31
    """Additional arguments to pass to the docker/container executable.
32
    Default is ["--rm"], which removes the container after it exits.
33
    """
34
    container_timeout: str = "2h"
35
    """Max duration to keep container running. Uses the same format as the sleep command."""
36
    pull_timeout: int = 120
37
    """Timeout in seconds for pulling images."""
38
    interpreter: list[str] = ["bash", "-lc"]
39
    """Interpreter to use to execute commands. Default is ["bash", "-lc"].
40
    The actual command will be appended as argument to this. Override this to e.g., modify shell flags
41
    (e.g., to remove the `-l` flag to disable login shell) or to use python instead of bash to interpret commands.
42
    """
43
 
44
 
45
class DockerEnvironment:
46
    def __init__(
47
        self,
48
        *,
49
        config_class: type = DockerEnvironmentConfig,
50
        logger: logging.Logger | None = None,
51
        **kwargs,
52
    ):
53
        """This class executes bash commands in a Docker container using direct docker commands.
54
        See `DockerEnvironmentConfig` for keyword arguments.
55
        """
56
        self.logger = logger or logging.getLogger("minisweagent.environment")
57
        self.container_id: str | None = None
58
        self.config = config_class(**kwargs)
59
        self._start_container()
60
 
61
    def get_template_vars(self, **kwargs) -> dict[str, Any]:
62
        return recursive_merge(self.config.model_dump(), platform.uname()._asdict(), kwargs)
63
 
64
    def serialize(self) -> dict:
65
        return {
66
            "info": {
67
                "config": {
68
                    "environment": self.config.model_dump(mode="json"),
69
                    "environment_type": f"{self.__class__.__module__}.{self.__class__.__name__}",
70
                }
71
            }
72
        }
73
 
74
    def _start_container(self):
75
        """Start the Docker container and return the container ID."""
76
        container_name = f"minisweagent-{uuid.uuid4().hex[:8]}"
77
        cmd = [
78
            self.config.executable,
79
            "run",
80
            "-d",
81
            "--name",
82
            container_name,
83
            "-w",
84
            self.config.cwd,
85
            *self.config.run_args,
86
            self.config.image,
87
            "sleep",
88
            self.config.container_timeout,
89
        ]
90
        self.logger.debug(f"Starting container with command: {shlex.join(cmd)}")
91
        result = subprocess.run(
92
            cmd,
93
            capture_output=True,
94
            text=True,
95
            timeout=self.config.pull_timeout,  # docker pull might take a while
96
            check=True,
97
        )
98
        self.logger.info(f"Started container {container_name} with ID {result.stdout.strip()}")
99
        self.container_id = result.stdout.strip()
100
 
101
    def execute(self, action: dict, cwd: str = "", *, timeout: int | None = None) -> dict[str, Any]:
102
        """Execute a command in the Docker container and return the result as a dict."""
103
        command = action.get("command", "")
104
        cwd = cwd or self.config.cwd
105
        assert self.container_id, "Container not started"
106
 
107
        cmd = [self.config.executable, "exec", "-w", cwd]
108
        for key in self.config.forward_env:
109
            if (value := os.getenv(key)) is not None:
110
                cmd.extend(["-e", f"{key}={value}"])
111
        for key, value in self.config.env.items():
112
            cmd.extend(["-e", f"{key}={value}"])
113
        cmd.extend([self.container_id, *self.config.interpreter, command])
114
 
115
        try:
116
            result = subprocess.run(
117
                cmd,
118
                text=True,
119
                timeout=timeout or self.config.timeout,
120
                encoding="utf-8",
121
                errors="replace",
122
                stdout=subprocess.PIPE,
123
                stderr=subprocess.STDOUT,
124
            )
125
            output = {"output": result.stdout, "returncode": result.returncode, "exception_info": ""}
126
        except Exception as e:
127
            raw_output = getattr(e, "output", None)
128
            raw_output = (
129
                raw_output.decode("utf-8", errors="replace") if isinstance(raw_output, bytes) else (raw_output or "")
130
            )
131
            output = {
132
                "output": raw_output,
133
                "returncode": -1,
134
                "exception_info": f"An error occurred while executing the command: {e}",
135
                "extra": {"exception_type": type(e).__name__, "exception": str(e)},
136
            }
137
        self._check_finished(output)
138
        return output
139
 
140
    def _check_finished(self, output: dict):
141
        """Raises Submitted if the output indicates task completion."""
142
        lines = output.get("output", "").lstrip().splitlines(keepends=True)
143
        if lines and lines[0].strip() == "COMPLETE_TASK_AND_SUBMIT_FINAL_OUTPUT" and output["returncode"] == 0:
144
            submission = "".join(lines[1:])
145
            raise Submitted(
146
                {
147
                    "role": "exit",
148
                    "content": submission,
149
                    "extra": {"exit_status": "Submitted", "submission": submission},
150
                }
151
            )
152
 
153
    def cleanup(self):
154
        """Stop and remove the Docker container."""
155
        if getattr(self, "container_id", None) is not None:  # if init fails early, container_id might not be set
156
            cmd = f"(timeout 60 {self.config.executable} stop {self.container_id} || {self.config.executable} rm -f {self.container_id}) >/dev/null 2>&1 &"
157
            subprocess.Popen(cmd, shell=True)
158
 
159
    def __del__(self):
160
        """Cleanup container when object is destroyed."""
161
        self.cleanup()
162
 
162 lines