| 1 | import json
|
| 2 | import logging
|
| 3 | import os
|
| 4 | import time
|
| 5 | from pathlib import Path
|
| 6 | from typing import Any, Literal
|
| 7 |
|
| 8 | import litellm
|
| 9 | from pydantic import BaseModel
|
| 10 |
|
| 11 | from minisweagent.models import GLOBAL_MODEL_STATS
|
| 12 | from minisweagent.models.utils.actions_toolcall_response import (
|
| 13 | BASH_TOOL_RESPONSE_API,
|
| 14 | format_toolcall_observation_messages,
|
| 15 | parse_toolcall_actions_response,
|
| 16 | )
|
| 17 | from minisweagent.models.utils.retry import retry
|
| 18 |
|
| 19 | logger = logging.getLogger("portkey_response_model")
|
| 20 |
|
| 21 | try:
|
| 22 | from portkey_ai import Portkey
|
| 23 | except ImportError:
|
| 24 | raise ImportError(
|
| 25 | "The portkey-ai package is required to use PortkeyResponseAPIModel. Please install it with: pip install portkey-ai"
|
| 26 | )
|
| 27 |
|
| 28 |
|
| 29 | class PortkeyResponseAPIModelConfig(BaseModel):
|
| 30 | model_name: str
|
| 31 | model_kwargs: dict[str, Any] = {}
|
| 32 | litellm_model_registry: Path | str | None = os.getenv("LITELLM_MODEL_REGISTRY_PATH")
|
| 33 | litellm_model_name_override: str = ""
|
| 34 | cost_tracking: Literal["default", "ignore_errors"] = os.getenv("MSWEA_COST_TRACKING", "default")
|
| 35 | format_error_template: str = "{{ error }}"
|
| 36 | observation_template: str = (
|
| 37 | "{% if output.exception_info %}<exception>{{output.exception_info}}</exception>\n{% endif %}"
|
| 38 | "<returncode>{{output.returncode}}</returncode>\n<output>\n{{output.output}}</output>"
|
| 39 | )
|
| 40 | multimodal_regex: str = ""
|
| 41 |
|
| 42 |
|
| 43 | class PortkeyResponseAPIModel:
|
| 44 | """Portkey model using the Responses API with native tool calling.
|
| 45 |
|
| 46 | Note: This implementation is stateless - each request must include
|
| 47 | the full conversation history. previous_response_id is not used.
|
| 48 | """
|
| 49 |
|
| 50 | abort_exceptions: list[type[Exception]] = [KeyboardInterrupt, TypeError, ValueError]
|
| 51 |
|
| 52 | def __init__(self, **kwargs):
|
| 53 | self.config = PortkeyResponseAPIModelConfig(**kwargs)
|
| 54 | if self.config.litellm_model_registry and Path(self.config.litellm_model_registry).is_file():
|
| 55 | litellm.utils.register_model(json.loads(Path(self.config.litellm_model_registry).read_text()))
|
| 56 |
|
| 57 | self._api_key = os.getenv("PORTKEY_API_KEY")
|
| 58 | if not self._api_key:
|
| 59 | raise ValueError(
|
| 60 | "Portkey API key is required. Set it via the "
|
| 61 | "PORTKEY_API_KEY environment variable. You can permanently set it with "
|
| 62 | "`mini-extra config set PORTKEY_API_KEY YOUR_KEY`."
|
| 63 | )
|
| 64 |
|
| 65 | virtual_key = os.getenv("PORTKEY_VIRTUAL_KEY")
|
| 66 | client_kwargs = {"api_key": self._api_key}
|
| 67 | if virtual_key:
|
| 68 | client_kwargs["virtual_key"] = virtual_key
|
| 69 |
|
| 70 | self.client = Portkey(**client_kwargs)
|
| 71 |
|
| 72 | def _query(self, messages: list[dict[str, str]], **kwargs):
|
| 73 | return self.client.responses.create(
|
| 74 | model=self.config.model_name,
|
| 75 | input=messages,
|
| 76 | tools=[BASH_TOOL_RESPONSE_API],
|
| 77 | **(self.config.model_kwargs | kwargs),
|
| 78 | )
|
| 79 |
|
| 80 | def _prepare_messages_for_api(self, messages: list[dict]) -> list[dict]:
|
| 81 | """Prepare messages for Portkey's stateless Responses API.
|
| 82 |
|
| 83 | Flattens response objects into their output items.
|
| 84 | """
|
| 85 | result = []
|
| 86 | for msg in messages:
|
| 87 | if msg.get("object") == "response":
|
| 88 | for item in msg.get("output", []):
|
| 89 | result.append({k: v for k, v in item.items() if k != "extra"})
|
| 90 | else:
|
| 91 | result.append({k: v for k, v in msg.items() if k != "extra"})
|
| 92 | return result
|
| 93 |
|
| 94 | def query(self, messages: list[dict[str, str]], **kwargs) -> dict:
|
| 95 | for attempt in retry(logger=logger, abort_exceptions=self.abort_exceptions):
|
| 96 | with attempt:
|
| 97 | response = self._query(self._prepare_messages_for_api(messages), **kwargs)
|
| 98 | cost_output = self._calculate_cost(response)
|
| 99 | GLOBAL_MODEL_STATS.add(cost_output["cost"])
|
| 100 | message = response.model_dump() if hasattr(response, "model_dump") else dict(response)
|
| 101 | message["extra"] = {
|
| 102 | "actions": self._parse_actions(response),
|
| 103 | **cost_output,
|
| 104 | "timestamp": time.time(),
|
| 105 | }
|
| 106 | return message
|
| 107 |
|
| 108 | def _parse_actions(self, response) -> list[dict]:
|
| 109 | """Parse tool calls from the response API response."""
|
| 110 | output = response.output if hasattr(response, "output") else response.get("output", [])
|
| 111 | return parse_toolcall_actions_response(output, format_error_template=self.config.format_error_template)
|
| 112 |
|
| 113 | def _calculate_cost(self, response) -> dict[str, float]:
|
| 114 | try:
|
| 115 | cost = litellm.cost_calculator.completion_cost(
|
| 116 | response, model=self.config.litellm_model_name_override or self.config.model_name
|
| 117 | )
|
| 118 | assert cost > 0.0, f"Cost is not positive: {cost}"
|
| 119 | except Exception as e:
|
| 120 | if self.config.cost_tracking != "ignore_errors":
|
| 121 | raise RuntimeError(
|
| 122 | f"Error calculating cost for model {self.config.model_name}: {e}. "
|
| 123 | "You can ignore this issue from your config file with cost_tracking: 'ignore_errors' or "
|
| 124 | "globally with export MSWEA_COST_TRACKING='ignore_errors' to ignore this error. "
|
| 125 | ) from e
|
| 126 | cost = 0.0
|
| 127 | return {"cost": cost}
|
| 128 |
|
| 129 | def format_message(self, **kwargs) -> dict:
|
| 130 | role = kwargs.get("role", "user")
|
| 131 | content = kwargs.get("content", "")
|
| 132 | extra = kwargs.get("extra")
|
| 133 | content_items = [{"type": "input_text", "text": content}] if isinstance(content, str) else content
|
| 134 | msg = {"type": "message", "role": role, "content": content_items}
|
| 135 | if extra:
|
| 136 | msg["extra"] = extra
|
| 137 | return msg
|
| 138 |
|
| 139 | def format_observation_messages(
|
| 140 | self, message: dict, outputs: list[dict], template_vars: dict | None = None
|
| 141 | ) -> list[dict]:
|
| 142 | """Format execution outputs into tool result messages."""
|
| 143 | actions = message.get("extra", {}).get("actions", [])
|
| 144 | return format_toolcall_observation_messages(
|
| 145 | actions=actions,
|
| 146 | outputs=outputs,
|
| 147 | observation_template=self.config.observation_template,
|
| 148 | template_vars=template_vars,
|
| 149 | multimodal_regex=self.config.multimodal_regex,
|
| 150 | )
|
| 151 |
|
| 152 | def get_template_vars(self, **kwargs) -> dict:
|
| 153 | return self.config.model_dump()
|
| 154 |
|
| 155 | def serialize(self) -> dict:
|
| 156 | return {
|
| 157 | "info": {
|
| 158 | "config": {
|
| 159 | "model": self.config.model_dump(mode="json"),
|
| 160 | "model_type": f"{self.__class__.__module__}.{self.__class__.__name__}",
|
| 161 | },
|
| 162 | }
|
| 163 | }
|
| 164 |
|