diff --git a/agentrun/agent_runtime/__runtime_async_template.py b/agentrun/agent_runtime/__runtime_async_template.py index 1726a1b..80cd028 100644 --- a/agentrun/agent_runtime/__runtime_async_template.py +++ b/agentrun/agent_runtime/__runtime_async_template.py @@ -43,6 +43,8 @@ class AgentRuntime( delete, update, query, and endpoint/version management. """ + _data_api: Dict[str, AgentRuntimeDataAPI] = {} + @classmethod def __get_client(cls): """获取客户端实例 / Get client instance @@ -462,16 +464,19 @@ async def invoke_openai_async( cfg = Config.with_configs(self._config, kwargs.get("config")) kwargs["config"] = cfg - if not self.__data_api: - self.__data_api: Dict[str, AgentRuntimeDataAPI] = {} + if not self._data_api: + self._data_api: Dict[str, AgentRuntimeDataAPI] = {} - if self.__data_api[agent_runtime_endpoint_name] is None: - self.__data_api[agent_runtime_endpoint_name] = AgentRuntimeDataAPI( + if ( + agent_runtime_endpoint_name not in self._data_api + or self._data_api[agent_runtime_endpoint_name] is None + ): + self._data_api[agent_runtime_endpoint_name] = AgentRuntimeDataAPI( agent_runtime_name=self.agent_runtime_name or "", agent_runtime_endpoint_name=agent_runtime_endpoint_name or "", config=cfg, ) - return await self.__data_api[ + return await self._data_api[ agent_runtime_endpoint_name ].invoke_openai_async(**kwargs) diff --git a/agentrun/agent_runtime/api/__data_async_template.py b/agentrun/agent_runtime/api/__data_async_template.py index b801b1f..5ff2525 100644 --- a/agentrun/agent_runtime/api/__data_async_template.py +++ b/agentrun/agent_runtime/api/__data_async_template.py @@ -37,8 +37,15 @@ async def invoke_openai_async( config = kwargs.get("config", None) cfg = Config.with_configs(self.config, config) - api_base = self.with_path("openai/v1") - _, headers, _ = self.auth(headers=cfg.get_headers()) + api_base = self.with_path("openai/v1", config=cfg) + # Sign the actual request URL (OpenAI client will POST to base + /chat/completions) + chat_completions_url = api_base.rstrip("/") + "/chat/completions" + _, headers, _ = self.auth( + url=chat_completions_url, + headers=cfg.get_headers(), + config=cfg, + method="POST", + ) from httpx import AsyncClient from openai import AsyncOpenAI diff --git a/agentrun/agent_runtime/api/data.py b/agentrun/agent_runtime/api/data.py index f3b5226..465db02 100644 --- a/agentrun/agent_runtime/api/data.py +++ b/agentrun/agent_runtime/api/data.py @@ -48,8 +48,15 @@ async def invoke_openai_async( config = kwargs.get("config", None) cfg = Config.with_configs(self.config, config) - api_base = self.with_path("openai/v1") - _, headers, _ = self.auth(headers=cfg.get_headers()) + api_base = self.with_path("openai/v1", config=cfg) + # Sign the actual request URL (OpenAI client will POST to base + /chat/completions) + chat_completions_url = api_base.rstrip("/") + "/chat/completions" + _, headers, _ = self.auth( + url=chat_completions_url, + headers=cfg.get_headers(), + config=cfg, + method="POST", + ) from httpx import AsyncClient from openai import AsyncOpenAI @@ -77,8 +84,15 @@ def invoke_openai( config = kwargs.get("config", None) cfg = Config.with_configs(self.config, config) - api_base = self.with_path("openai/v1") - _, headers, _ = self.auth(headers=cfg.get_headers()) + api_base = self.with_path("openai/v1", config=cfg) + # Sign the actual request URL (OpenAI client will POST to base + /chat/completions) + chat_completions_url = api_base.rstrip("/") + "/chat/completions" + _, headers, _ = self.auth( + url=chat_completions_url, + headers=cfg.get_headers(), + config=cfg, + method="POST", + ) from httpx import Client from openai import OpenAI diff --git a/agentrun/agent_runtime/runtime.py b/agentrun/agent_runtime/runtime.py index af2dde8..9d177f6 100644 --- a/agentrun/agent_runtime/runtime.py +++ b/agentrun/agent_runtime/runtime.py @@ -53,6 +53,8 @@ class AgentRuntime( delete, update, query, and endpoint/version management. """ + _data_api: Dict[str, AgentRuntimeDataAPI] = {} + @classmethod def __get_client(cls): """获取客户端实例 / Get client instance @@ -867,17 +869,20 @@ async def invoke_openai_async( cfg = Config.with_configs(self._config, kwargs.get("config")) kwargs["config"] = cfg - if not self.__data_api: - self.__data_api: Dict[str, AgentRuntimeDataAPI] = {} + if not self._data_api: + self._data_api: Dict[str, AgentRuntimeDataAPI] = {} - if self.__data_api[agent_runtime_endpoint_name] is None: - self.__data_api[agent_runtime_endpoint_name] = AgentRuntimeDataAPI( + if ( + agent_runtime_endpoint_name in self._data_api + and self._data_api[agent_runtime_endpoint_name] is None + ): + self._data_api[agent_runtime_endpoint_name] = AgentRuntimeDataAPI( agent_runtime_name=self.agent_runtime_name or "", agent_runtime_endpoint_name=agent_runtime_endpoint_name or "", config=cfg, ) - return await self.__data_api[ + return await self._data_api[ agent_runtime_endpoint_name ].invoke_openai_async(**kwargs) @@ -889,16 +894,19 @@ def invoke_openai( cfg = Config.with_configs(self._config, kwargs.get("config")) kwargs["config"] = cfg - if not self.__data_api: - self.__data_api: Dict[str, AgentRuntimeDataAPI] = {} + if not self._data_api: + self._data_api: Dict[str, AgentRuntimeDataAPI] = {} - if self.__data_api[agent_runtime_endpoint_name] is None: - self.__data_api[agent_runtime_endpoint_name] = AgentRuntimeDataAPI( + if ( + agent_runtime_endpoint_name not in self._data_api + or self._data_api[agent_runtime_endpoint_name] is None + ): + self._data_api[agent_runtime_endpoint_name] = AgentRuntimeDataAPI( agent_runtime_name=self.agent_runtime_name or "", agent_runtime_endpoint_name=agent_runtime_endpoint_name or "", config=cfg, ) - return self.__data_api[agent_runtime_endpoint_name].invoke_openai( + return self._data_api[agent_runtime_endpoint_name].invoke_openai( **kwargs ) diff --git a/agentrun/sandbox/api/__aio_data_async_template.py b/agentrun/sandbox/api/__aio_data_async_template.py index b27311a..129ee38 100644 --- a/agentrun/sandbox/api/__aio_data_async_template.py +++ b/agentrun/sandbox/api/__aio_data_async_template.py @@ -103,9 +103,13 @@ def sync_playwright( from .playwright_sync import BrowserPlaywrightSync cfg = Config.with_configs(self.config, config) - _, headers, _ = self.auth(headers=cfg.get_headers(), config=cfg) + + url = self.get_cdp_url(record=record) + url, headers, _ = self.auth( + url=url, headers=cfg.get_headers(), config=cfg + ) return BrowserPlaywrightSync( - self.get_cdp_url(record=record), + url, browser_type=browser_type, headers=headers, ) @@ -119,9 +123,13 @@ def async_playwright( from .playwright_async import BrowserPlaywrightAsync cfg = Config.with_configs(self.config, config) - _, headers, _ = self.auth(headers=cfg.get_headers(), config=cfg) + + url = self.get_cdp_url(record=record) + url, headers, _ = self.auth( + url=url, headers=cfg.get_headers(), config=cfg + ) return BrowserPlaywrightAsync( - self.get_cdp_url(record=record), + url, browser_type=browser_type, headers=headers, ) diff --git a/agentrun/sandbox/api/__browser_data_async_template.py b/agentrun/sandbox/api/__browser_data_async_template.py index 16e884e..cf215b5 100644 --- a/agentrun/sandbox/api/__browser_data_async_template.py +++ b/agentrun/sandbox/api/__browser_data_async_template.py @@ -92,9 +92,13 @@ def sync_playwright( from .playwright_sync import BrowserPlaywrightSync cfg = Config.with_configs(self.config, config) - _, headers, _ = self.auth(headers=cfg.get_headers(), config=cfg) + + url = self.get_cdp_url(record=record) + url, headers, _ = self.auth( + url=url, headers=cfg.get_headers(), config=cfg + ) return BrowserPlaywrightSync( - self.get_cdp_url(record=record), + url, browser_type=browser_type, headers=headers, ) @@ -108,9 +112,13 @@ def async_playwright( from .playwright_async import BrowserPlaywrightAsync cfg = Config.with_configs(self.config, config) - _, headers, _ = self.auth(headers=cfg.get_headers(), config=cfg) + + url = self.get_cdp_url(record=record) + url, headers, _ = self.auth( + url=url, headers=cfg.get_headers(), config=cfg + ) return BrowserPlaywrightAsync( - self.get_cdp_url(record=record), + url, browser_type=browser_type, headers=headers, ) diff --git a/agentrun/sandbox/api/aio_data.py b/agentrun/sandbox/api/aio_data.py index b772e1d..d271db4 100644 --- a/agentrun/sandbox/api/aio_data.py +++ b/agentrun/sandbox/api/aio_data.py @@ -113,9 +113,13 @@ def sync_playwright( from .playwright_sync import BrowserPlaywrightSync cfg = Config.with_configs(self.config, config) - _, headers, _ = self.auth(headers=cfg.get_headers(), config=cfg) + + url = self.get_cdp_url(record=record) + url, headers, _ = self.auth( + url=url, headers=cfg.get_headers(), config=cfg + ) return BrowserPlaywrightSync( - self.get_cdp_url(record=record), + url, browser_type=browser_type, headers=headers, ) @@ -129,9 +133,13 @@ def async_playwright( from .playwright_async import BrowserPlaywrightAsync cfg = Config.with_configs(self.config, config) - _, headers, _ = self.auth(headers=cfg.get_headers(), config=cfg) + + url = self.get_cdp_url(record=record) + url, headers, _ = self.auth( + url=url, headers=cfg.get_headers(), config=cfg + ) return BrowserPlaywrightAsync( - self.get_cdp_url(record=record), + url, browser_type=browser_type, headers=headers, ) diff --git a/agentrun/sandbox/api/browser_data.py b/agentrun/sandbox/api/browser_data.py index 50709f5..2523605 100644 --- a/agentrun/sandbox/api/browser_data.py +++ b/agentrun/sandbox/api/browser_data.py @@ -102,9 +102,13 @@ def sync_playwright( from .playwright_sync import BrowserPlaywrightSync cfg = Config.with_configs(self.config, config) - _, headers, _ = self.auth(headers=cfg.get_headers(), config=cfg) + + url = self.get_cdp_url(record=record) + url, headers, _ = self.auth( + url=url, headers=cfg.get_headers(), config=cfg + ) return BrowserPlaywrightSync( - self.get_cdp_url(record=record), + url, browser_type=browser_type, headers=headers, ) @@ -118,9 +122,13 @@ def async_playwright( from .playwright_async import BrowserPlaywrightAsync cfg = Config.with_configs(self.config, config) - _, headers, _ = self.auth(headers=cfg.get_headers(), config=cfg) + + url = self.get_cdp_url(record=record) + url, headers, _ = self.auth( + url=url, headers=cfg.get_headers(), config=cfg + ) return BrowserPlaywrightAsync( - self.get_cdp_url(record=record), + url, browser_type=browser_type, headers=headers, ) diff --git a/agentrun/toolset/__toolset_async_template.py b/agentrun/toolset/__toolset_async_template.py index aec1d04..7a92754 100644 --- a/agentrun/toolset/__toolset_async_template.py +++ b/agentrun/toolset/__toolset_async_template.py @@ -94,11 +94,17 @@ def _get_openapi_auth_defaults( return headers, query def _get_openapi_base_url(self) -> Optional[str]: - return pydash.get( - self, - "status.outputs.urls.intranet_url", - None, - ) or pydash.get(self, "status.outputs.urls.internet_url", None) + import os + + fc_region = os.getenv("FC_REGION") + if fc_region: + intranet_url: Optional[str] = pydash.get( + self, "status.outputs.urls.intranet_url", None + ) + if intranet_url: + return intranet_url + + return pydash.get(self, "status.outputs.urls.internet_url", None) async def get_async(self, config: Optional[Config] = None): if self.name is None: @@ -154,6 +160,25 @@ async def call_tool_async( logger.debug("invoke tool %s got result %s", name, result) return result + def _get_mcp_url(self) -> str: + """获取 MCP 工具的最佳 URL / Get the best URL for MCP tool + + 优先使用 agentrun-data 代理入口(支持 RAM 签名认证), + 回退到 mcp_server_config.url(直连)。 + Priority: agentrun-data proxy endpoint (with RAM auth) > mcp_server_config.url (direct). + """ + proxy_url = self._get_openapi_base_url() + if proxy_url: + return proxy_url + + mcp_server_config: MCPServerConfig = pydash.get( + self, "status.outputs.mcp_server_config", None + ) + if mcp_server_config and mcp_server_config.url: + return mcp_server_config.url + + raise ValueError("MCP server URL is missing.") + def to_apiset(self, config: Optional[Config] = None): """将 ToolSet 转换为统一的 ApiSet 对象 @@ -168,16 +193,16 @@ def to_apiset(self, config: Optional[Config] = None): mcp_server_config: MCPServerConfig = pydash.get( self, "status.outputs.mcp_server_config", None ) - assert ( - mcp_server_config.url is not None - ), "MCP server URL is missing." - cfg = Config.with_configs( - config, Config(headers=mcp_server_config.headers) + mcp_url = self._get_mcp_url() + + mcp_headers = ( + mcp_server_config.headers if mcp_server_config else None ) + cfg = Config.with_configs(config, Config(headers=mcp_headers)) mcp_client = MCPToolSet( - url=mcp_server_config.url, + url=mcp_url, config=cfg, ) diff --git a/agentrun/toolset/api/mcp.py b/agentrun/toolset/api/mcp.py index d2c382a..ab0d3a1 100644 --- a/agentrun/toolset/api/mcp.py +++ b/agentrun/toolset/api/mcp.py @@ -4,10 +4,88 @@ Handles tool invocations for MCP (Model Context Protocol). """ -from typing import Any, Dict, Optional +from typing import Any, Dict, Generator, Optional +from urllib.parse import urlparse, urlunparse + +import httpx from agentrun.utils.config import Config from agentrun.utils.log import logger +from agentrun.utils.ram_signature import get_agentrun_signed_headers + + +class _AgentrunRamAuth(httpx.Auth): + """httpx Auth handler:为每次请求动态生成 RAM 签名。 + + SSE 场景下同一个 httpx.AsyncClient 会发出 GET(SSE 连接)和 + POST(消息发送)请求,URL / method / body 各不相同,因此必须 + per-request 计算签名,不能在 client 初始化时一次性设置 headers。 + """ + + def __init__( + self, + access_key_id: str, + access_key_secret: str, + region: str, + security_token: Optional[str] = None, + ): + self._ak = access_key_id + self._sk = access_key_secret + self._region = region + self._security_token = security_token + + def auth_flow( + self, request: httpx.Request + ) -> Generator[httpx.Request, httpx.Response, None]: + url = str(request.url) + method = request.method + + body: Optional[bytes] = None + if request.content: + body = request.content + + content_type: Optional[str] = request.headers.get("content-type") + + try: + signed = get_agentrun_signed_headers( + url=url, + method=method, + access_key_id=self._ak, + access_key_secret=self._sk, + security_token=self._security_token, + region=self._region, + product="agentrun", + body=body, + content_type=content_type, + ) + for k, v in signed.items(): + request.headers[k] = v + logger.debug( + "applied RAM signature for MCP %s request to %s", + method, + url[:80] + ("..." if len(url) > 80 else ""), + ) + except ValueError as e: + logger.warning("RAM signing skipped for MCP request: %s", e) + + yield request + + +def _rewrite_to_ram_url(url: str) -> str: + """将 agentrun-data 域名改写为 -ram 端点。""" + parsed = urlparse(url) + parts = parsed.netloc.split(".", 1) + if len(parts) == 2: + ram_netloc = parts[0] + "-ram." + parts[1] + return urlunparse(( + parsed.scheme, + ram_netloc, + parsed.path or "", + parsed.params, + parsed.query, + parsed.fragment, + )) + return url class MCPSession: @@ -16,14 +94,46 @@ def __init__(self, url: str, config: Optional[Config] = None): self.url = url self.config = Config.with_configs(config) + def _build_ram_auth(self, url: str) -> tuple: + """当目标是 agentrun-data 域名时,改写 URL 并返回 httpx Auth handler。 + + Returns: + (rewritten_url, auth_or_none) + """ + parsed = urlparse(url) + if ".agentrun-data." not in (parsed.netloc or ""): + return url, None + + cfg = self.config + ak = cfg.get_access_key_id() + sk = cfg.get_access_key_secret() + if not ak or not sk: + return url, None + + url = _rewrite_to_ram_url(url) + + auth = _AgentrunRamAuth( + access_key_id=ak, + access_key_secret=sk, + region=cfg.get_region_id(), + security_token=cfg.get_security_token() or None, + ) + return url, auth + async def __aenter__(self): from mcp import ClientSession from mcp.client.sse import sse_client timeout = self.config.get_timeout() + headers = self.config.get_headers() + url = self.url + + url, auth = self._build_ram_auth(url) + self.client = sse_client( - url=self.url, - headers=self.config.get_headers(), + url=url, + headers=headers, + auth=auth, timeout=timeout if timeout else 60, ) read, write = await self.client.__aenter__() diff --git a/agentrun/toolset/api/openapi.py b/agentrun/toolset/api/openapi.py index df0198f..5d52b39 100644 --- a/agentrun/toolset/api/openapi.py +++ b/agentrun/toolset/api/openapi.py @@ -7,6 +7,7 @@ from copy import deepcopy import json from typing import Any, Dict, List, Optional, Tuple, Union +from urllib.parse import urlparse, urlunparse import httpx from pydash import get as pg @@ -15,6 +16,7 @@ from agentrun.utils.config import Config from agentrun.utils.log import logger from agentrun.utils.model import BaseModel +from agentrun.utils.ram_signature import get_agentrun_signed_headers from ..model import ToolInfo, ToolSchema @@ -829,10 +831,22 @@ def invoke_tool( request_kwargs, timeout, raise_for_status = self._prepare_request( name, arguments, config ) + with httpx.Client(timeout=timeout) as client: response = client.request(**request_kwargs) if raise_for_status: - response.raise_for_status() + try: + response.raise_for_status() + except httpx.HTTPStatusError: + logger.error( + "OpenAPI tool request failed: status=%s url=%s " + "response_headers=%s response_body=%s", + response.status_code, + response.request.url, + dict(response.headers), + response.text[:2000], + ) + raise return self._format_response(response) async def invoke_tool_async( @@ -848,7 +862,18 @@ async def invoke_tool_async( async with httpx.AsyncClient(timeout=timeout) as client: response = await client.request(**request_kwargs) if raise_for_status: - response.raise_for_status() + try: + response.raise_for_status() + except httpx.HTTPStatusError: + logger.error( + "OpenAPI tool request failed: status=%s url=%s " + "response_headers=%s response_body=%s", + response.status_code, + response.request.url, + dict(response.headers), + response.text[:2000], + ) + raise return self._format_response(response) def _load_schema(self, schema: Any) -> Dict[str, Any]: @@ -1174,8 +1199,66 @@ def _prepare_request( args, ) + self._apply_ram_auth(request_kwargs, combined_config) + return request_kwargs, timeout, raise_for_status + def _apply_ram_auth( + self, + request_kwargs: Dict[str, Any], + config: Optional[Config], + ) -> None: + """当目标是 agentrun-data 域名时,自动注入 RAM 签名鉴权 headers 并改写为 -ram 端点。""" + url = request_kwargs.get("url", "") + parsed = urlparse(url) + if ".agentrun-data." not in (parsed.netloc or ""): + return + + cfg = Config.with_configs(config) + ak = cfg.get_access_key_id() + sk = cfg.get_access_key_secret() + if not ak or not sk: + return + + parts = parsed.netloc.split(".", 1) + if len(parts) == 2: + ram_netloc = parts[0] + "-ram." + parts[1] + url = urlunparse(( + parsed.scheme, + ram_netloc, + parsed.path or "", + parsed.params, + parsed.query, + parsed.fragment, + )) + request_kwargs["url"] = url + + method = request_kwargs.get("method", "GET") + body_bytes: Optional[bytes] = None + json_body = request_kwargs.get("json") + if json_body is not None: + body_bytes = json.dumps(json_body).encode("utf-8") + + try: + signed = get_agentrun_signed_headers( + url=url, + method=method, + access_key_id=ak, + access_key_secret=sk, + security_token=cfg.get_security_token() or None, + region=cfg.get_region_id(), + product="agentrun", + body=body_bytes, + ) + existing_headers: Dict[str, str] = request_kwargs.get("headers", {}) + request_kwargs["headers"] = {**signed, **existing_headers} + logger.debug( + "applied RAM signature for OpenAPI tool request to %s", + url[:80] + "..." if len(url) > 80 else url, + ) + except ValueError as e: + logger.warning("RAM signing skipped for OpenAPI tool: %s", e) + def _format_response(self, response: httpx.Response) -> Dict[str, Any]: try: body = response.json() diff --git a/agentrun/toolset/toolset.py b/agentrun/toolset/toolset.py index bd417fb..5aa2496 100644 --- a/agentrun/toolset/toolset.py +++ b/agentrun/toolset/toolset.py @@ -109,11 +109,17 @@ def _get_openapi_auth_defaults( return headers, query def _get_openapi_base_url(self) -> Optional[str]: - return pydash.get( - self, - "status.outputs.urls.intranet_url", - None, - ) or pydash.get(self, "status.outputs.urls.internet_url", None) + import os + + fc_region = os.getenv("FC_REGION") + if fc_region: + intranet_url: Optional[str] = pydash.get( + self, "status.outputs.urls.intranet_url", None + ) + if intranet_url: + return intranet_url + + return pydash.get(self, "status.outputs.urls.internet_url", None) async def get_async(self, config: Optional[Config] = None): if self.name is None: @@ -221,6 +227,25 @@ def call_tool( logger.debug("invoke tool %s got result %s", name, result) return result + def _get_mcp_url(self) -> str: + """获取 MCP 工具的最佳 URL / Get the best URL for MCP tool + + 优先使用 agentrun-data 代理入口(支持 RAM 签名认证), + 回退到 mcp_server_config.url(直连)。 + Priority: agentrun-data proxy endpoint (with RAM auth) > mcp_server_config.url (direct). + """ + proxy_url = self._get_openapi_base_url() + if proxy_url: + return proxy_url + + mcp_server_config: MCPServerConfig = pydash.get( + self, "status.outputs.mcp_server_config", None + ) + if mcp_server_config and mcp_server_config.url: + return mcp_server_config.url + + raise ValueError("MCP server URL is missing.") + def to_apiset(self, config: Optional[Config] = None): """将 ToolSet 转换为统一的 ApiSet 对象 @@ -235,16 +260,16 @@ def to_apiset(self, config: Optional[Config] = None): mcp_server_config: MCPServerConfig = pydash.get( self, "status.outputs.mcp_server_config", None ) - assert ( - mcp_server_config.url is not None - ), "MCP server URL is missing." - cfg = Config.with_configs( - config, Config(headers=mcp_server_config.headers) + mcp_url = self._get_mcp_url() + + mcp_headers = ( + mcp_server_config.headers if mcp_server_config else None ) + cfg = Config.with_configs(config, Config(headers=mcp_headers)) mcp_client = MCPToolSet( - url=mcp_server_config.url, + url=mcp_url, config=cfg, ) diff --git a/agentrun/utils/__data_api_async_template.py b/agentrun/utils/__data_api_async_template.py index c664f7f..e6279f5 100644 --- a/agentrun/utils/__data_api_async_template.py +++ b/agentrun/utils/__data_api_async_template.py @@ -8,6 +8,7 @@ """ from enum import Enum +import json from typing import Any, Dict, Optional, Union from urllib.parse import parse_qs, urlencode, urlparse, urlunparse @@ -15,8 +16,8 @@ from agentrun.utils.config import Config from agentrun.utils.exception import ClientError -from agentrun.utils.helper import mask_password from agentrun.utils.log import logger +from agentrun.utils.ram_signature import get_agentrun_signed_headers class ResourceType(Enum): @@ -65,25 +66,45 @@ def __init__( self.resource_name = resource_name self.resource_type = resource_type - self.access_token = None self.config = Config.with_configs(config) self.namespace = namespace - if self.config.get_token(): - logger.debug( - "using provided access token from config, %s", - mask_password(self.config.get_token() or ""), - ) - self.access_token = self.config.get_token() + def _use_ram_auth(self, config: Optional[Config] = None) -> bool: + """是否使用 RAM 签名鉴权(配置了 AK/SK 时使用)。""" + cfg = Config.with_configs(self.config, config) + return bool(cfg.get_access_key_id() and cfg.get_access_key_secret()) + + def _get_ram_data_endpoint(self, config: Optional[Config] = None) -> str: + """返回 RAM 鉴权用的 data endpoint(仅当默认 agentrun-data 域名时在 host 前加 -ram)。""" + cfg = Config.with_configs(self.config, config) + base = cfg.get_data_endpoint() + parsed = urlparse(base) + if not parsed.netloc or ".agentrun-data." not in parsed.netloc: + return base + parts = parsed.netloc.split(".", 1) + if len(parts) != 2: + return base + ram_netloc = parts[0] + "-ram." + parts[1] + return urlunparse(( + parsed.scheme, + ram_netloc, + parsed.path or "", + parsed.params, + parsed.query, + parsed.fragment, + )) def get_base_url(self) -> str: """ Get the base URL for API requests. + 当使用 RAM 鉴权时返回 RAM 端点(host 带 -ram)。 Returns: The base URL string """ + if self._use_ram_auth(): + return self._get_ram_data_endpoint() return self.config.get_data_endpoint() def with_path( @@ -166,81 +187,40 @@ def auth( headers: Optional[Dict[str, str]] = None, query: Optional[Dict[str, Any]] = None, config: Optional[Config] = None, + method: str = "GET", + body: Optional[bytes] = None, ) -> tuple[str, Dict[str, str], Optional[Dict[str, Any]]]: """ - Authentication hook for modifying requests before sending. - - This method can be overridden in subclasses to implement custom - authentication logic (e.g., signing requests, adding auth tokens). - - Args: - url: The request URL - headers: The request headers - query: The query parameters - - Returns: - Tuple of (modified_url, modified_headers, modified_query) - - Examples: - Override this method to add custom authentication: - - >>> class AuthedClient(AgentRunDataClient): - ... def auth(self, url, headers, query): - ... # Add auth token to headers - ... headers["Authorization"] = "Bearer token123" - ... # Or add signature to query - ... query = query or {} - ... query["signature"] = self._sign_request(url) - ... return url, headers, query + Authentication: 仅使用 RAM 签名鉴权(Agentrun-Authorization)。需在 config 中配置 AK/SK。 """ cfg = Config.with_configs(self.config, config) - if ( - self.access_token is None - and self.resource_name - and self.resource_type - and not cfg.get_token() - ): + if self._use_ram_auth(cfg): try: - from alibabacloud_agentrun20250910.models import ( - GetAccessTokenRequest, + signed = get_agentrun_signed_headers( + url=url, + method=method, + access_key_id=cfg.get_access_key_id(), + access_key_secret=cfg.get_access_key_secret(), + security_token=cfg.get_security_token() or None, + region=cfg.get_region_id(), + product="agentrun", + body=body, ) - - from .control_api import ControlAPI - - cli = ControlAPI(self.config)._get_client() - input = ( - GetAccessTokenRequest( - resource_id=self.resource_name, - resource_type=self.resource_type.value, - ) - if self.resource_type == ResourceType.Sandbox - else GetAccessTokenRequest( - resource_name=self.resource_name, - resource_type=self.resource_type.value, - ) - ) - - resp = cli.get_access_token(input) - self.access_token = resp.body.data.access_token - - except Exception as e: - logger.warning( - "Failed to get access token for" - f" {self.resource_type}({self.resource_name}): {e}" + headers = { + **signed, + **cfg.get_headers(), + **(headers or {}), + } + logger.debug( + "using RAM signature for data API request to %s", + url[:80] + "..." if len(url) > 80 else url, ) - - logger.debug( - "fetching access token for resource %s of type %s, %s", - self.resource_name, - self.resource_type, - mask_password(self.access_token or ""), - ) - headers = { - "Agentrun-Access-Token": cfg.get_token() or self.access_token or "", - **cfg.get_headers(), - **(headers or {}), - } + except ValueError as e: + logger.warning("RAM signing skipped (missing AK/SK): %s", e) + headers = {**cfg.get_headers(), **(headers or {})} + else: + headers = {**cfg.get_headers(), **(headers or {})} return url, headers, query @@ -267,8 +247,19 @@ def _prepare_request( if headers: req_headers.update(headers) + body_bytes: Optional[bytes] = None + if data is not None: + if isinstance(data, dict): + body_bytes = json.dumps(data).encode("utf-8") + elif isinstance(data, str): + body_bytes = data.encode("utf-8") + else: + body_bytes = str(data).encode("utf-8") + # Apply authentication (may modify URL, headers, and query) - url, req_headers, query = self.auth(url, req_headers, query, config=cfg) + url, req_headers, query = self.auth( + url, req_headers, query, config=cfg, method=method, body=body_bytes + ) # Add query parameters to URL if provided if query: @@ -606,7 +597,9 @@ async def post_file_async( req_headers.update(headers or {}) # Apply authentication (may modify URL, headers, and query) cfg = Config.with_configs(self.config, config) - url, req_headers, query = self.auth(url, req_headers, query, config=cfg) + url, req_headers, query = self.auth( + url, req_headers, query, config=cfg, method="POST" + ) try: with open(local_file_path, "rb") as f: @@ -659,7 +652,9 @@ async def get_file_async( req_headers.update(headers or {}) # Apply authentication (may modify URL, headers, and query) cfg = Config.with_configs(self.config, config) - url, req_headers, query = self.auth(url, req_headers, query, config=cfg) + url, req_headers, query = self.auth( + url, req_headers, query, config=cfg, method="GET" + ) try: async with httpx.AsyncClient( @@ -708,7 +703,9 @@ async def get_video_async( req_headers.update(headers or {}) # Apply authentication (may modify URL, headers, and query) cfg = Config.with_configs(self.config, config) - url, req_headers, query = self.auth(url, req_headers, query, config=cfg) + url, req_headers, query = self.auth( + url, req_headers, query, config=cfg, method="GET" + ) try: async with httpx.AsyncClient( diff --git a/agentrun/utils/data_api.py b/agentrun/utils/data_api.py index ba9b496..ad8326b 100644 --- a/agentrun/utils/data_api.py +++ b/agentrun/utils/data_api.py @@ -18,6 +18,7 @@ """ from enum import Enum +import json from typing import Any, Dict, Optional, Union from urllib.parse import parse_qs, urlencode, urlparse, urlunparse @@ -25,8 +26,8 @@ from agentrun.utils.config import Config from agentrun.utils.exception import ClientError -from agentrun.utils.helper import mask_password from agentrun.utils.log import logger +from agentrun.utils.ram_signature import get_agentrun_signed_headers class ResourceType(Enum): @@ -75,29 +76,56 @@ def __init__( self.resource_name = resource_name self.resource_type = resource_type - self.access_token = None self.config = Config.with_configs(config) self.namespace = namespace - if self.config.get_token(): - logger.debug( - "using provided access token from config, %s", - mask_password(self.config.get_token() or ""), - ) - self.access_token = self.config.get_token() + def _use_ram_auth(self, config: Optional[Config] = None) -> bool: + """是否使用 RAM 签名鉴权(配置了 AK/SK 时使用)。""" + cfg = Config.with_configs(self.config, config) + return bool(cfg.get_access_key_id() and cfg.get_access_key_secret()) + + def _get_ram_data_endpoint(self, config: Optional[Config] = None) -> str: + """返回 RAM 鉴权用的 data endpoint(仅当默认 agentrun-data 域名时在 host 前加 -ram)。""" + cfg = Config.with_configs(self.config, config) + base = cfg.get_data_endpoint() + parsed = urlparse(base) + if not parsed.netloc or ".agentrun-data." not in parsed.netloc: + return base + parts = parsed.netloc.split(".", 1) + if len(parts) != 2: + return base + ram_netloc = parts[0] + "-ram." + parts[1] + return urlunparse(( + parsed.scheme, + ram_netloc, + parsed.path or "", + parsed.params, + parsed.query, + parsed.fragment, + )) - def get_base_url(self) -> str: + def get_base_url(self, config: Optional[Config] = None) -> str: """ Get the base URL for API requests. + 当使用 RAM 鉴权时返回 RAM 端点(host 带 -ram)。 + + Args: + config: 可选,用于计算 base URL 的配置;未传时使用 self.config。 Returns: The base URL string """ - return self.config.get_data_endpoint() + cfg = Config.with_configs(self.config, config) + if self._use_ram_auth(cfg): + return self._get_ram_data_endpoint(cfg) + return cfg.get_data_endpoint() def with_path( - self, path: str, query: Optional[Dict[str, Any]] = None + self, + path: str, + query: Optional[Dict[str, Any]] = None, + config: Optional[Config] = None, ) -> str: """ Construct full URL with the given path and query parameters. @@ -105,6 +133,7 @@ def with_path( Args: path: API path (may include query string) query: Query parameters to add/merge + config: 可选,用于计算 base URL 的配置;未传时使用 self.config。 Returns: Complete URL string with query parameters @@ -124,7 +153,7 @@ def with_path( base_url = "/".join([ part.strip("/") for part in [ - self.get_base_url(), + self.get_base_url(config), self.namespace, path, ] @@ -176,81 +205,42 @@ def auth( headers: Optional[Dict[str, str]] = None, query: Optional[Dict[str, Any]] = None, config: Optional[Config] = None, + method: str = "GET", + body: Optional[bytes] = None, ) -> tuple[str, Dict[str, str], Optional[Dict[str, Any]]]: """ - Authentication hook for modifying requests before sending. - - This method can be overridden in subclasses to implement custom - authentication logic (e.g., signing requests, adding auth tokens). - - Args: - url: The request URL - headers: The request headers - query: The query parameters - - Returns: - Tuple of (modified_url, modified_headers, modified_query) - - Examples: - Override this method to add custom authentication: - - >>> class AuthedClient(AgentRunDataClient): - ... def auth(self, url, headers, query): - ... # Add auth token to headers - ... headers["Authorization"] = "Bearer token123" - ... # Or add signature to query - ... query = query or {} - ... query["signature"] = self._sign_request(url) - ... return url, headers, query + Authentication: 仅使用 RAM 签名鉴权(Agentrun-Authorization)。 + 需在 config 中配置 AK/SK。 """ cfg = Config.with_configs(self.config, config) - if ( - self.access_token is None - and self.resource_name - and self.resource_type - and not cfg.get_token() - ): + if self._use_ram_auth(cfg): try: - from alibabacloud_agentrun20250910.models import ( - GetAccessTokenRequest, + signed = get_agentrun_signed_headers( + url=url, + method=method, + access_key_id=cfg.get_access_key_id(), + access_key_secret=cfg.get_access_key_secret(), + security_token=cfg.get_security_token() or None, + region=cfg.get_region_id(), + product="agentrun", + body=body, ) - - from .control_api import ControlAPI - - cli = ControlAPI(self.config)._get_client() - input = ( - GetAccessTokenRequest( - resource_id=self.resource_name, - resource_type=self.resource_type.value, - ) - if self.resource_type == ResourceType.Sandbox - else GetAccessTokenRequest( - resource_name=self.resource_name, - resource_type=self.resource_type.value, - ) + headers = { + **signed, + **cfg.get_headers(), + **(headers or {}), + } + logger.debug( + "using RAM signature for data API request to %s, token %s", + url[:80] + "..." if len(url) > 80 else url, + signed, ) - - resp = cli.get_access_token(input) - self.access_token = resp.body.data.access_token - - except Exception as e: - logger.warning( - "Failed to get access token for" - f" {self.resource_type}({self.resource_name}): {e}" - ) - - logger.debug( - "fetching access token for resource %s of type %s, %s", - self.resource_name, - self.resource_type, - mask_password(self.access_token or ""), - ) - headers = { - "Agentrun-Access-Token": cfg.get_token() or self.access_token or "", - **cfg.get_headers(), - **(headers or {}), - } + except ValueError as e: + logger.warning("RAM signing skipped (missing AK/SK): %s", e) + headers = {**cfg.get_headers(), **(headers or {})} + else: + headers = {**cfg.get_headers(), **(headers or {})} return url, headers, query @@ -277,8 +267,20 @@ def _prepare_request( if headers: req_headers.update(headers) + # Build body bytes for RAM signing when applicable + body_bytes: Optional[bytes] = None + if data is not None: + if isinstance(data, dict): + body_bytes = json.dumps(data).encode("utf-8") + elif isinstance(data, str): + body_bytes = data.encode("utf-8") + else: + body_bytes = str(data).encode("utf-8") + # Apply authentication (may modify URL, headers, and query) - url, req_headers, query = self.auth(url, req_headers, query, config=cfg) + url, req_headers, query = self.auth( + url, req_headers, query, config=cfg, method=method, body=body_bytes + ) # Add query parameters to URL if provided if query: @@ -858,7 +860,9 @@ async def post_file_async( req_headers.update(headers or {}) # Apply authentication (may modify URL, headers, and query) cfg = Config.with_configs(self.config, config) - url, req_headers, query = self.auth(url, req_headers, query, config=cfg) + url, req_headers, query = self.auth( + url, req_headers, query, config=cfg, method="POST" + ) try: with open(local_file_path, "rb") as f: @@ -920,7 +924,9 @@ def post_file( req_headers.update(headers or {}) # Apply authentication (may modify URL, headers, and query) cfg = Config.with_configs(self.config, config) - url, req_headers, query = self.auth(url, req_headers, query, config=cfg) + url, req_headers, query = self.auth( + url, req_headers, query, config=cfg, method="POST" + ) try: with open(local_file_path, "rb") as f: @@ -971,7 +977,9 @@ async def get_file_async( req_headers.update(headers or {}) # Apply authentication (may modify URL, headers, and query) cfg = Config.with_configs(self.config, config) - url, req_headers, query = self.auth(url, req_headers, query, config=cfg) + url, req_headers, query = self.auth( + url, req_headers, query, config=cfg, method="GET" + ) try: async with httpx.AsyncClient( @@ -1020,7 +1028,9 @@ def get_file( req_headers.update(headers or {}) # Apply authentication (may modify URL, headers, and query) cfg = Config.with_configs(self.config, config) - url, req_headers, query = self.auth(url, req_headers, query, config=cfg) + url, req_headers, query = self.auth( + url, req_headers, query, config=cfg, method="GET" + ) try: with httpx.Client(timeout=self.config.get_timeout()) as client: @@ -1067,7 +1077,9 @@ async def get_video_async( req_headers.update(headers or {}) # Apply authentication (may modify URL, headers, and query) cfg = Config.with_configs(self.config, config) - url, req_headers, query = self.auth(url, req_headers, query, config=cfg) + url, req_headers, query = self.auth( + url, req_headers, query, config=cfg, method="GET" + ) try: async with httpx.AsyncClient( @@ -1116,7 +1128,9 @@ def get_video( req_headers.update(headers or {}) # Apply authentication (may modify URL, headers, and query) cfg = Config.with_configs(self.config, config) - url, req_headers, query = self.auth(url, req_headers, query, config=cfg) + url, req_headers, query = self.auth( + url, req_headers, query, config=cfg, method="GET" + ) try: with httpx.Client(timeout=self.config.get_timeout()) as client: diff --git a/agentrun/utils/ram_signature/README.md b/agentrun/utils/ram_signature/README.md new file mode 100644 index 0000000..dc1ad30 --- /dev/null +++ b/agentrun/utils/ram_signature/README.md @@ -0,0 +1,62 @@ +# AgentRun RAM 签名独立实现 + +本目录为 **独立文件夹实现**,**Python 手写签名逻辑**,无 `alibabacloud_signer_inner` 等外部签名库依赖。 +**Node.js 版本** 位于 [agentrun-sdk-nodejs](../agentrun-sdk-nodejs) 的 `src/utils/ram-signature/` 目录。 + +## 目录结构(Python SDK) + +``` +ram_signature/ +├── README.md # 本说明 +├── __init__.py # Python 包入口,导出 get_agentrun_signed_headers +└── python/ # Python 手写实现 + ├── __init__.py + └── signer.py # AGENTRUN4-HMAC-SHA256 纯手写 +``` + +## 算法说明 + +- **算法名**: `AGENTRUN4-HMAC-SHA256` +- **Payload**: `UNSIGNED-PAYLOAD`(不参与 body 哈希) +- **参与签名的头**: `host`, `x-acs-date`, `x-acs-content-sha256`,可选 `x-acs-security-token` +- **流程**: Canonical Request → StringToSign → HMAC 派生 Key → Signature,与阿里云 OSS V4 / ACS 风格一致 + +## Python 使用 + +在 AgentRun Python SDK 内已通过 `agentrun.utils.ram_signature` 或 `agentrun.ram_signature` 使用: + +```python +from agentrun.ram_signature import get_agentrun_signed_headers + +headers = get_agentrun_signed_headers( + url="https://xxx-ram.agentrun-data.cn-hangzhou.aliyuncs.com/path", + method="GET", + access_key_id="ak", + access_key_secret="sk", + region="cn-hangzhou", + product="agentrun", +) +# headers["Agentrun-Authorization"], headers["x-acs-date"], ... +``` + +## Node.js 使用 + +Node.js 实现位于 **agentrun-sdk-nodejs** 仓库的 `src/utils/ram-signature/` 目录,通过 `@agentrun/sdk` 的 utils 导出: + +```typescript +import { getAgentrunSignedHeaders } from '@agentrun/sdk'; + +const headers = getAgentrunSignedHeaders({ + url: 'https://xxx-ram.agentrun-data.cn-hangzhou.aliyuncs.com/path', + method: 'GET', + accessKeyId: 'ak', + accessKeySecret: 'sk', + region: 'cn-hangzhou', + product: 'agentrun', +}); +// headers['Agentrun-Authorization'], headers['x-acs-date'], ... +``` + +## 与 ram-e2e-test 的对应关系 + +逻辑与 ram-e2e-test/signature_helper.py 一致,用于替代原 `GetAccessToken` + `Agentrun-Access-Token` 的 Data API 鉴权方式;此处为手写实现,不依赖 `alibabacloud_signer_inner`。 diff --git a/agentrun/utils/ram_signature/__init__.py b/agentrun/utils/ram_signature/__init__.py new file mode 100644 index 0000000..0102f3f --- /dev/null +++ b/agentrun/utils/ram_signature/__init__.py @@ -0,0 +1,5 @@ +"""Python 手写实现:AGENTRUN4-HMAC-SHA256,无 alibabacloud_signer_inner 依赖。""" + +from .signer import get_agentrun_signed_headers + +__all__ = ["get_agentrun_signed_headers"] diff --git a/agentrun/utils/ram_signature/signer.py b/agentrun/utils/ram_signature/signer.py new file mode 100644 index 0000000..ce0b126 --- /dev/null +++ b/agentrun/utils/ram_signature/signer.py @@ -0,0 +1,198 @@ +""" +AgentRun RAM 签名 - Python 手写实现(无外部 signer 依赖) + +实现 AGENTRUN4-HMAC-SHA256,与 http-auth-acs / ram-e2e-test(alibabacloud_signer_inner)行为一致: +- UNSIGNED-PAYLOAD +- x-acs-date(ISO8601)、x-acs-content-sha256、host、可选 x-acs-security-token +- Canonical Request 与 http-auth-acs 一致(URI 不编码、Query 空值用 key=、仅签 x-acs-* / host / content-type) +- StringToSign 为 2 行:ALGORITHM + "\\n" + SHA256(CanonicalRequest)(无 timestamp/scope) +""" + +from datetime import datetime, timezone +import hashlib +import hmac +from typing import Optional +from urllib.parse import quote, urlparse + +ALGORITHM = "AGENTRUN4-HMAC-SHA256" +UNSIGNED_PAYLOAD = "UNSIGNED-PAYLOAD" +SCOPE_SUFFIX = "aliyun_v4_request" +KEY_PREFIX = "aliyun_v4" + + +def _build_scope(date: str, region: str, product: str) -> str: + return f"{date}/{region}/{product}/{SCOPE_SUFFIX}" + + +def _percent_encode(value: Optional[str]) -> str: + """与 http-auth-acs 一致:quote(safe='') 后把 %7E 还原为 ~。""" + if value is None: + return "" + return quote(str(value), safe="").replace("%7E", "~") + + +def _canonical_uri(path: str) -> str: + """与 http-auth-acs 一致:path 原样,不 percent-encode。""" + if path is None or path == "": + return "/" + return path + + +def _canonical_query(query_params: dict) -> str: + """与 http-auth-acs 一致:空值为 encoded_key=(带等号)。""" + if not query_params: + return "" + parts = [] + for k in sorted(query_params.keys()): + v = query_params.get(k) + enc_k = _percent_encode(k) + if v is not None and v != "": + parts.append(f"{enc_k}={_percent_encode(v)}") + else: + parts.append(f"{enc_k}=") + return "&".join(parts) + + +def _get_signed_headers(headers: dict) -> list[str]: + """与 http-auth-acs 一致:仅签 x-acs-*、host、content-type,且 value 非 None。""" + out = set() + for key, value in headers.items(): + lower_key = key.lower().strip() + if value is not None and ( + lower_key.startswith("x-acs-") + or lower_key == "host" + or lower_key == "content-type" + ): + out.add(lower_key) + return sorted(out) + + +def _canonical_headers(headers: dict) -> tuple[str, str]: + """与 http-auth-acs 一致:先归一化再按 signed_headers 顺序输出 header:value\\n。""" + new_headers: dict[str, str] = {} + for k, v in headers.items(): + lower_key = k.lower().strip() + if v is not None: + new_headers[lower_key] = str(v).strip() + signed_list = _get_signed_headers(headers) + canonical = "".join(f"{h}:{new_headers[h]}\n" for h in signed_list) + signed_str = ";".join(signed_list) + return canonical, signed_str + + +def _calc_canonical_request( + method: str, + pathname: str, + query_params: dict, + headers: dict, + hashed_payload: str, +) -> str: + method = method.upper() + uri = _canonical_uri(pathname) + query = _canonical_query(query_params) + canon_headers, signed_headers = _canonical_headers(headers) + return f"{method}\n{uri}\n{query}\n{canon_headers}\n{signed_headers}\n{hashed_payload}" + + +def _calc_string_to_sign(canonical_request: str) -> str: + """与 http-auth-acs 一致:2 行 StringToSign = ALGORITHM + \\n + SHA256(CanonicalRequest)。""" + digest = hashlib.sha256(canonical_request.encode("utf-8")).hexdigest() + return f"{ALGORITHM}\n{digest}" + + +def _get_signing_key( + secret: str, date: str, region: str, product: str +) -> bytes: + key = (KEY_PREFIX + secret).encode("utf-8") + k_date = hmac.new(key, date.encode("utf-8"), hashlib.sha256).digest() + k_region = hmac.new(k_date, region.encode("utf-8"), hashlib.sha256).digest() + k_product = hmac.new( + k_region, product.encode("utf-8"), hashlib.sha256 + ).digest() + k_signing = hmac.new( + k_product, SCOPE_SUFFIX.encode("utf-8"), hashlib.sha256 + ).digest() + return k_signing + + +def _calc_signature( + secret: str, + date: str, + region: str, + product: str, + string_to_sign: str, +) -> str: + signing_key = _get_signing_key(secret, date, region, product) + return hmac.new( + signing_key, string_to_sign.encode("utf-8"), hashlib.sha256 + ).hexdigest() + + +def get_agentrun_signed_headers( + url: str, + method: str = "GET", + access_key_id: Optional[str] = None, + access_key_secret: Optional[str] = None, + security_token: Optional[str] = None, + region: str = "cn-hangzhou", + product: str = "agentrun", + body: Optional[bytes] = None, + content_type: Optional[str] = None, + sign_time: Optional[datetime] = None, +) -> dict: + """ + 生成 AgentRun 签名头(手写实现,无外部依赖)。 + 返回包含 Agentrun-Authorization、x-acs-date、x-acs-content-sha256 等的 headers。 + content_type 若提供会参与签名(与 http-auth-acs 一致,SignedHeaders 含 content-type)。 + """ + if not access_key_id or not access_key_secret: + raise ValueError("Access Key ID and Secret are required") + + parsed = urlparse(url) + host = parsed.netloc + pathname = parsed.path or "/" + query_params: dict = {} + if parsed.query: + for pair in parsed.query.split("&"): + if "=" in pair: + k, v = pair.split("=", 1) + query_params[k] = v + + now = sign_time if sign_time is not None else datetime.now(timezone.utc) + if now.tzinfo is None: + now = now.replace(tzinfo=timezone.utc) + timestamp = now.strftime("%Y-%m-%dT%H:%M:%SZ") + date = now.strftime("%Y%m%d") + + headers_for_sign: dict = { + "host": host, + "x-acs-date": timestamp, + "x-acs-content-sha256": UNSIGNED_PAYLOAD, + } + if security_token: + headers_for_sign["x-acs-security-token"] = security_token + if content_type is not None: + headers_for_sign["content-type"] = content_type + + scope = _build_scope(date, region, product) + canonical_request = _calc_canonical_request( + method, + pathname, + query_params, + headers_for_sign, + UNSIGNED_PAYLOAD, + ) + string_to_sign = _calc_string_to_sign(canonical_request) + signature = _calc_signature( + access_key_secret, date, region, product, string_to_sign + ) + + signed_headers_str = ";".join(_get_signed_headers(headers_for_sign)) + auth_value = ( + f"{ALGORITHM} Credential={access_key_id}/{scope}," + f"SignedHeaders={signed_headers_str},Signature={signature}" + ) + + result = dict(headers_for_sign) + result["Agentrun-Authorization"] = auth_value + return result diff --git a/tests/unittests/ram_signature/__init__.py b/tests/unittests/ram_signature/__init__.py new file mode 100644 index 0000000..fd0a499 --- /dev/null +++ b/tests/unittests/ram_signature/__init__.py @@ -0,0 +1 @@ +# Tests for agentrun.ram_signature standalone implementation diff --git a/tests/unittests/ram_signature/test_signer.py b/tests/unittests/ram_signature/test_signer.py new file mode 100644 index 0000000..cb4a913 --- /dev/null +++ b/tests/unittests/ram_signature/test_signer.py @@ -0,0 +1,347 @@ +"""测试独立文件夹内的 RAM 签名手写实现(无 mock)。""" + +from datetime import datetime, timezone +from urllib.parse import urlparse + +import pytest + +from agentrun.utils.ram_signature.signer import ( + _canonical_headers, + _canonical_uri, + _percent_encode, + get_agentrun_signed_headers, +) + + +class TestRamSignatureStandalone: + """对手写实现的直接测试,不依赖 data_api 或 mock。""" + + def test_returns_required_headers(self): + headers = get_agentrun_signed_headers( + url="https://uid-ram.agentrun-data.cn-hangzhou.aliyuncs.com/sandboxes/s1/health", + method="GET", + access_key_id="ak", + access_key_secret="sk", + region="cn-hangzhou", + product="agentrun", + ) + assert "Agentrun-Authorization" in headers + assert "x-acs-date" in headers + assert "x-acs-content-sha256" in headers + assert headers["x-acs-content-sha256"] == "UNSIGNED-PAYLOAD" + assert "host" in headers + assert ( + "uid-ram.agentrun-data.cn-hangzhou.aliyuncs.com" in headers["host"] + ) + + def test_authorization_format(self): + headers = get_agentrun_signed_headers( + url="https://example.agentrun-data.cn-hangzhou.aliyuncs.com/path", + method="GET", + access_key_id="test-ak", + access_key_secret="test-sk", + ) + auth = headers["Agentrun-Authorization"] + assert auth.startswith("AGENTRUN4-HMAC-SHA256 ") + assert "Credential=test-ak/" in auth + assert "SignedHeaders=" in auth + assert "Signature=" in auth + + def test_requires_ak_sk(self): + with pytest.raises(ValueError, match="Access Key ID and Secret"): + get_agentrun_signed_headers( + url="https://x.agentrun-data.cn-hangzhou.aliyuncs.com/", + access_key_id="", + access_key_secret="sk", + ) + with pytest.raises(ValueError, match="Access Key ID and Secret"): + get_agentrun_signed_headers( + url="https://x.agentrun-data.cn-hangzhou.aliyuncs.com/", + access_key_id="ak", + access_key_secret="", + ) + + def test_security_token_in_headers_when_provided(self): + headers = get_agentrun_signed_headers( + url="https://x.agentrun-data.cn-hangzhou.aliyuncs.com/", + access_key_id="ak", + access_key_secret="sk", + security_token="sts-token", + ) + assert "x-acs-security-token" in headers + assert headers["x-acs-security-token"] == "sts-token" + assert "x-acs-security-token" in headers["Agentrun-Authorization"] + + def test_deterministic_with_same_inputs(self): + url = "https://uid-ram.agentrun-data.cn-hangzhou.aliyuncs.com/path?a=1" + opts = dict( + url=url, + method="POST", + access_key_id="ak", + access_key_secret="sk", + region="cn-hangzhou", + product="agentrun", + ) + from datetime import datetime, timezone + + t = datetime(2025, 1, 1, 12, 0, 0, tzinfo=timezone.utc) + h1 = get_agentrun_signed_headers(**opts, sign_time=t) + h2 = get_agentrun_signed_headers(**opts, sign_time=t) + assert h1["Agentrun-Authorization"] == h2["Agentrun-Authorization"] + assert h1["x-acs-date"] == h2["x-acs-date"] + + def test_fixed_params_snapshot_matches_compare_script(self): + """与 scripts/compare_ram_signature.py 使用相同固定参数,签名结果应与快照一致(便于与 ram-e2e-test 对比)。""" + from datetime import datetime, timezone + + FIXED_SIGN_TIME = datetime(2026, 3, 6, 12, 0, 0, tzinfo=timezone.utc) + TEST_URL = "https://1431999136518149-ram.agentrun-data.cn-hangzhou.aliyuncs.com/sandboxes/sbx-xxx/health" + DEFAULT_AK = "LTAI5t5opp3xMWk2B3a4gZVq" + DEFAULT_SK = "secret" + + headers = get_agentrun_signed_headers( + url=TEST_URL, + method="GET", + access_key_id=DEFAULT_AK, + access_key_secret=DEFAULT_SK, + region="cn-hangzhou", + product="agentrun", + sign_time=FIXED_SIGN_TIME, + ) + auth = headers.get("Agentrun-Authorization", "") + sig = ( + auth.split("Signature=")[-1].strip() if "Signature=" in auth else "" + ) + # 快照:与 compare_ram_signature.py 同参数时的输出;若 ram-e2e-test 有 REF_Signature 可与此对比 + EXPECTED_SIGNATURE = ( + "e1479ea1aec37e55f91d82b1ccc48df2feef04184a911f91a3e3fe0e27d02610" + ) + assert sig == EXPECTED_SIGNATURE, ( + f"固定参数下 Signature 与快照不一致: got {sig!r}, expected" + f" {EXPECTED_SIGNATURE!r}. 可与 ram-e2e-test/print_ref_signature.py" + " 输出对比。" + ) + + +# 固定时间 + query/body/header 多场景快照(与 scripts/print_ram_signature_snapshots.py / 官方包 / JS SDK 一致) +# JS/Python 等 SDK 可用相同输入校验签名一致,参见 TestRamSignatureFixedQueryBodyHeader。 +FIXED_SIGN_TIME_QBH = datetime(2026, 3, 6, 12, 0, 0, tzinfo=timezone.utc) +FIXED_AK = "LTAI5t5opp3xMWk2B3a4gZVq" +FIXED_SK = "secret" +BASE_URL_QBH = ( + "https://1431999136518149-ram.agentrun-data.cn-hangzhou.aliyuncs.com" +) +SCENARIO_1_EXPECTED = ( + "2de34f2c0ef3d6f0f6460f994ee3fea8c940241fcc2ff2f008a776f1be9b4dba" +) +SCENARIO_2_EXPECTED = ( + "a0d4e04405ddf83d93cf8b30c6064c1a68a298cf95ca6ff56be04f515b98ddbe" +) +SCENARIO_3_EXPECTED = ( + "fdcde808b0dc7526d8083e681ca9a64728e5c1e67e4c92735dfb2268ecc71fb2" +) +# 场景说明(供 JS SDK 对齐): +# 1) GET {BASE_URL_QBH}/path?foo=bar&zoo= , no body, no content-type -> SCENARIO_1_EXPECTED +# 2) POST {BASE_URL_QBH}/path?foo=bar&zoo= , body=b"", no content-type -> SCENARIO_2_EXPECTED +# 3) POST {BASE_URL_QBH}/path?foo=bar&zoo= , body=b"", content-type=application/json -> SCENARIO_3_EXPECTED +# 时间统一: 2026-03-06T12:00:00Z (date=20260306), region=cn-hangzhou, product=agentrun, UNSIGNED-PAYLOAD + + +def _ref_signature_if_available( + url: str, + method: str, + body: bytes | None, + content_type: str | None, + sign_time: datetime, + ak: str, + sk: str, + region: str, + product: str, +) -> str | None: + """若已安装 alibabacloud_signer_inner,用相同参数生成参考签名,便于与官方包/JS 对齐。""" + try: + from alibabacloud_signer_inner import AcsV4HttpSigner, SignRequest + except ImportError: + return None + + class UnsignedPayloadSigner(AcsV4HttpSigner): + ALGORITHM = "AGENTRUN4-HMAC-SHA256" + + def _hash_payload(self, payload) -> str: + return "UNSIGNED-PAYLOAD" + + parsed = urlparse(url) + host = parsed.netloc + pathname = parsed.path or "/" + query_params = {} + if parsed.query: + for pair in parsed.query.split("&"): + if "=" in pair: + k, v = pair.split("=", 1) + query_params[k] = v + + timestamp = sign_time.strftime("%Y-%m-%dT%H:%M:%SZ") + date = sign_time.strftime("%Y%m%d") + headers = { + "host": host, + "x-acs-date": timestamp, + "x-acs-content-sha256": "UNSIGNED-PAYLOAD", + } + if content_type is not None: + headers["content-type"] = content_type + + payload = body if body is not None else b"" + sign_request = SignRequest( + pathname=pathname, + method=method.upper(), + query_parameters=query_params, + header_parameters=headers, + payload=payload, + access_key_id=ak, + access_key_secret=sk, + security_token=None, + product=product, + region=region, + date=date, + ) + signer = UnsignedPayloadSigner() + auth_value = signer.sign(sign_request) + return ( + auth_value.split("Signature=")[-1].strip() + if "Signature=" in auth_value + else None + ) + + +class TestRamSignatureFixedQueryBodyHeader: + """固定时间 + query/body/header 多场景:与官方包(alibabacloud_signer_inner)及 JS SDK 结果一致。""" + + def _sig( + self, + url: str, + method: str, + body: bytes | None, + content_type: str | None, + ) -> str: + headers = get_agentrun_signed_headers( + url=url, + method=method, + access_key_id=FIXED_AK, + access_key_secret=FIXED_SK, + region="cn-hangzhou", + product="agentrun", + body=body, + content_type=content_type, + sign_time=FIXED_SIGN_TIME_QBH, + ) + auth = headers.get("Agentrun-Authorization", "") + return ( + auth.split("Signature=")[-1].strip() if "Signature=" in auth else "" + ) + + def test_scenario_1_get_query_no_body_no_content_type(self): + """GET + query (foo=bar&zoo=),无 body,无 content-type。""" + url = f"{BASE_URL_QBH}/path?foo=bar&zoo=" + sig = self._sig(url, "GET", None, None) + assert sig == SCENARIO_1_EXPECTED, f"got {sig!r}" + ref = _ref_signature_if_available( + url, + "GET", + None, + None, + FIXED_SIGN_TIME_QBH, + FIXED_AK, + FIXED_SK, + "cn-hangzhou", + "agentrun", + ) + if ref is not None: + assert sig == ref, "SDK 与官方包(ref) 签名应一致" + + def test_scenario_2_post_query_empty_body_no_content_type(self): + """POST + query,body 空,无 content-type。""" + url = f"{BASE_URL_QBH}/path?foo=bar&zoo=" + sig = self._sig(url, "POST", b"", None) + assert sig == SCENARIO_2_EXPECTED, f"got {sig!r}" + ref = _ref_signature_if_available( + url, + "POST", + b"", + None, + FIXED_SIGN_TIME_QBH, + FIXED_AK, + FIXED_SK, + "cn-hangzhou", + "agentrun", + ) + if ref is not None: + assert sig == ref, "SDK 与官方包(ref) 签名应一致" + + def test_scenario_3_post_query_empty_body_content_type_json(self): + """POST + query,body 空,content-type: application/json。""" + url = f"{BASE_URL_QBH}/path?foo=bar&zoo=" + sig = self._sig(url, "POST", b"", "application/json") + assert sig == SCENARIO_3_EXPECTED, f"got {sig!r}" + ref = _ref_signature_if_available( + url, + "POST", + b"", + "application/json", + FIXED_SIGN_TIME_QBH, + FIXED_AK, + FIXED_SK, + "cn-hangzhou", + "agentrun", + ) + if ref is not None: + assert sig == ref, "SDK 与官方包(ref) 签名应一致" + + +class TestSignerHelperFunctions: + """测试签名辅助函数的边界情况""" + + def test_percent_encode_none(self): + """测试 _percent_encode(None) 返回空字符串""" + assert _percent_encode(None) == "" + + def test_percent_encode_tilde(self): + """测试 _percent_encode 正确处理 ~ 字符""" + assert "~" in _percent_encode("a~b") + + def test_canonical_uri_empty(self): + """测试 _canonical_uri 空字符串返回 /""" + assert _canonical_uri("") == "/" + + def test_canonical_uri_none(self): + """测试 _canonical_uri None 返回 /""" + assert _canonical_uri(None) == "/" + + def test_canonical_uri_normal(self): + """测试 _canonical_uri 正常路径""" + assert _canonical_uri("/path/to/resource") == "/path/to/resource" + + def test_canonical_headers_skips_none_values(self): + """测试 _canonical_headers 跳过 value 为 None 的 header""" + headers = { + "host": "example.com", + "x-acs-date": "2026-01-01T00:00:00Z", + "x-acs-skip": None, + } + canon, signed = _canonical_headers(headers) + assert "x-acs-skip" not in signed + assert "host" in signed + + +class TestSignerNaiveDatetime: + """测试 naive datetime(无时区信息)的处理""" + + def test_naive_datetime_gets_utc(self): + """测试 naive datetime 被自动设置为 UTC""" + naive_time = datetime(2026, 1, 1, 12, 0, 0) + headers = get_agentrun_signed_headers( + url="https://x.agentrun-data.cn-hangzhou.aliyuncs.com/path", + access_key_id="ak", + access_key_secret="sk", + sign_time=naive_time, + ) + assert headers["x-acs-date"] == "2026-01-01T12:00:00Z" diff --git a/tests/unittests/toolset/api/test_openapi.py b/tests/unittests/toolset/api/test_openapi.py index 3a60866..bb32eac 100644 --- a/tests/unittests/toolset/api/test_openapi.py +++ b/tests/unittests/toolset/api/test_openapi.py @@ -3,15 +3,18 @@ 测试内容: 1. $ref 解析是否正确展开内部引用 2. Mock 服务端验证请求参数是否符合预期 +3. agentrun-data 域名自动 RAM 签名注入 """ import json +from unittest.mock import patch import httpx import respx from agentrun.toolset.api.openapi import ApiSet, OpenAPI from agentrun.toolset.model import ToolInfo, ToolSchema +from agentrun.utils.config import Config class TestOpenAPIRefResolution: @@ -1210,3 +1213,132 @@ def test_tool_schema(self): assert json_schema["type"] == "object" assert "items" in json_schema["properties"] assert json_schema["properties"]["items"]["type"] == "array" + + +AGENTRUN_DATA_SCHEMA = { + "openapi": "3.0.1", + "info": {"title": "Test", "version": "1.0"}, + "servers": [{ + "url": "https://1431999136518149.agentrun-data.cn-hangzhou.aliyuncs.com/tools/test/" + }], + "paths": { + "/invoke": { + "post": { + "operationId": "test_tool", + "summary": "Test tool", + "requestBody": { + "required": True, + "content": { + "application/json": { + "schema": { + "type": "object", + "properties": { + "query": { + "type": "string", + } + }, + } + } + }, + }, + "responses": {"200": {"description": "OK"}}, + } + } + }, +} + + +class TestOpenAPIRamAuth: + """测试 agentrun-data 域名 OpenAPI 调用时自动注入 RAM 签名""" + + def _make_openapi( + self, + base_url: str | None = None, + config: Config | None = None, + ) -> OpenAPI: + return OpenAPI( + schema=AGENTRUN_DATA_SCHEMA, + base_url=base_url, + config=config, + ) + + def test_ram_auth_applied_for_agentrun_data_url(self): + """agentrun-data 域名应自动注入 RAM 签名 headers 并改写为 -ram 端点""" + cfg = Config( + access_key_id="test-ak", + access_key_secret="test-sk", + region_id="cn-hangzhou", + ) + openapi = self._make_openapi(config=cfg) + request_kwargs, _, _ = openapi._prepare_request( + "test_tool", {"query": "hello"}, cfg + ) + assert "Agentrun-Authorization" in request_kwargs["headers"] + assert "-ram." in request_kwargs["url"] + assert request_kwargs["headers"]["Agentrun-Authorization"].startswith( + "AGENTRUN4-HMAC-SHA256 " + ) + + def test_ram_auth_not_applied_for_non_agentrun_url(self): + """非 agentrun-data 域名不应注入 RAM 签名""" + cfg = Config( + access_key_id="test-ak", + access_key_secret="test-sk", + region_id="cn-hangzhou", + ) + openapi = self._make_openapi( + base_url="https://example.com/api", + config=cfg, + ) + request_kwargs, _, _ = openapi._prepare_request( + "test_tool", {"query": "hello"}, cfg + ) + assert "Agentrun-Authorization" not in request_kwargs["headers"] + assert "-ram." not in request_kwargs["url"] + + def test_ram_auth_skipped_without_ak_sk(self): + """没有 AK/SK 时应跳过 RAM 签名""" + with patch.dict( + "os.environ", + {}, + clear=True, + ): + cfg = Config(region_id="cn-hangzhou") + openapi = self._make_openapi(config=cfg) + request_kwargs, _, _ = openapi._prepare_request( + "test_tool", {"query": "hello"}, cfg + ) + assert "Agentrun-Authorization" not in request_kwargs["headers"] + + def test_ram_auth_url_rewrite(self): + """验证 URL 被正确改写为 -ram 端点""" + cfg = Config( + access_key_id="test-ak", + access_key_secret="test-sk", + region_id="cn-hangzhou", + ) + openapi = self._make_openapi(config=cfg) + request_kwargs, _, _ = openapi._prepare_request( + "test_tool", {"query": "hello"}, cfg + ) + assert request_kwargs["url"].startswith( + "https://1431999136518149-ram.agentrun-data." + ) + + def test_ram_auth_preserves_existing_headers(self): + """RAM 签名不应覆盖用户自定义 headers""" + cfg = Config( + access_key_id="test-ak", + access_key_secret="test-sk", + region_id="cn-hangzhou", + ) + openapi = OpenAPI( + schema=AGENTRUN_DATA_SCHEMA, + headers={"X-Custom": "value"}, + config=cfg, + ) + request_kwargs, _, _ = openapi._prepare_request( + "test_tool", {"query": "hello"}, cfg + ) + assert "Agentrun-Authorization" in request_kwargs["headers"] + assert request_kwargs["headers"]["X-Custom"] == "value" diff --git a/tests/unittests/toolset/test_toolset.py b/tests/unittests/toolset/test_toolset.py index 6d8c6f5..76d2abe 100644 --- a/tests/unittests/toolset/test_toolset.py +++ b/tests/unittests/toolset/test_toolset.py @@ -4,6 +4,8 @@ Tests ToolSet resource class functionality. """ +import os +import unittest.mock from unittest.mock import AsyncMock, MagicMock, patch import pytest @@ -275,8 +277,8 @@ def test_no_urls(self): toolset = ToolSet() assert toolset._get_openapi_base_url() is None - def test_intranet_url_preferred(self): - """测试优先使用内网 URL""" + def test_intranet_url_preferred_on_fc(self): + """测试在 FC 环境下优先使用内网 URL""" toolset = ToolSet( status=ToolSetStatus( outputs=ToolSetStatusOutputs( @@ -287,10 +289,31 @@ def test_intranet_url_preferred(self): ) ) ) - assert toolset._get_openapi_base_url() == "https://internal.example.com" + with unittest.mock.patch.dict(os.environ, {"FC_REGION": "cn-hangzhou"}): + assert ( + toolset._get_openapi_base_url() + == "https://internal.example.com" + ) + + def test_internet_url_when_not_on_fc(self): + """测试非 FC 环境使用公网 URL""" + toolset = ToolSet( + status=ToolSetStatus( + outputs=ToolSetStatusOutputs( + urls=ToolSetStatusOutputsUrls( + internet_url="https://public.example.com", + intranet_url="https://internal.example.com", + ) + ) + ) + ) + with unittest.mock.patch.dict(os.environ, {}, clear=True): + assert ( + toolset._get_openapi_base_url() == "https://public.example.com" + ) def test_internet_url_fallback(self): - """测试公网 URL 作为回退""" + """测试只有公网 URL 时作为回退""" toolset = ToolSet( status=ToolSetStatus( outputs=ToolSetStatusOutputs( @@ -682,5 +705,5 @@ def test_to_apiset_mcp_missing_url(self): ) ), ) - with pytest.raises(AssertionError, match="MCP server URL is missing"): + with pytest.raises(ValueError, match="MCP server URL is missing"): toolset.to_apiset() diff --git a/tests/unittests/utils/test_control_api.py b/tests/unittests/utils/test_control_api.py index 0113d38..efd799f 100644 --- a/tests/unittests/utils/test_control_api.py +++ b/tests/unittests/utils/test_control_api.py @@ -277,3 +277,139 @@ def test_get_devs_client_with_read_timeout(self, mock_client_class): config_arg = call_args[0][0] assert config_arg.connect_timeout == 300 assert config_arg.read_timeout == 60000 + + +class TestControlAPIGetBailianClient: + """测试 ControlAPI._get_bailian_client""" + + @patch("agentrun.utils.control_api.BailianClient") + def test_get_bailian_client_basic(self, mock_client_class): + """测试获取基本百炼客户端""" + config = Config( + access_key_id="ak", + access_key_secret="sk", + region_id="cn-hangzhou", + ) + api = ControlAPI(config=config) + + mock_client = MagicMock() + mock_client_class.return_value = mock_client + + result = api._get_bailian_client() + + assert mock_client_class.called + call_args = mock_client_class.call_args + config_arg = call_args[0][0] + assert config_arg.access_key_id == "ak" + assert config_arg.access_key_secret == "sk" + assert config_arg.region_id == "cn-hangzhou" + + @patch("agentrun.utils.control_api.BailianClient") + def test_get_bailian_client_strips_https_prefix(self, mock_client_class): + """测试获取百炼客户端时去除 https:// 前缀""" + config = Config( + access_key_id="ak", + access_key_secret="sk", + bailian_endpoint="https://bailian.cn-hangzhou.aliyuncs.com", + ) + api = ControlAPI(config=config) + + mock_client = MagicMock() + mock_client_class.return_value = mock_client + + api._get_bailian_client() + + call_args = mock_client_class.call_args + config_arg = call_args[0][0] + assert config_arg.endpoint == "bailian.cn-hangzhou.aliyuncs.com" + + @patch("agentrun.utils.control_api.BailianClient") + def test_get_bailian_client_strips_http_prefix(self, mock_client_class): + """测试获取百炼客户端时去除 http:// 前缀""" + config = Config( + access_key_id="ak", + access_key_secret="sk", + bailian_endpoint="http://bailian.custom.com", + ) + api = ControlAPI(config=config) + + mock_client = MagicMock() + mock_client_class.return_value = mock_client + + api._get_bailian_client() + + call_args = mock_client_class.call_args + config_arg = call_args[0][0] + assert config_arg.endpoint == "bailian.custom.com" + + +class TestControlAPIGetGPDBClient: + """测试 ControlAPI._get_gpdb_client""" + + @patch("agentrun.utils.control_api.GPDBClient") + def test_get_gpdb_client_known_region(self, mock_client_class): + """测试已知 region 使用通用 endpoint""" + config = Config( + access_key_id="ak", + access_key_secret="sk", + region_id="cn-hangzhou", + ) + api = ControlAPI(config=config) + + mock_client = MagicMock() + mock_client_class.return_value = mock_client + + api._get_gpdb_client() + + call_args = mock_client_class.call_args + config_arg = call_args[0][0] + assert config_arg.endpoint == "gpdb.aliyuncs.com" + + @patch("agentrun.utils.control_api.GPDBClient") + def test_get_gpdb_client_unknown_region(self, mock_client_class): + """测试未知 region 使用区域级别 endpoint""" + config = Config( + access_key_id="ak", + access_key_secret="sk", + region_id="us-west-1", + ) + api = ControlAPI(config=config) + + mock_client = MagicMock() + mock_client_class.return_value = mock_client + + api._get_gpdb_client() + + call_args = mock_client_class.call_args + config_arg = call_args[0][0] + assert config_arg.endpoint == "gpdb.us-west-1.aliyuncs.com" + + @patch("agentrun.utils.control_api.GPDBClient") + def test_get_gpdb_client_all_known_regions(self, mock_client_class): + """测试所有已知 region 使用通用 endpoint""" + known_regions = [ + "cn-beijing", + "cn-hangzhou", + "cn-shanghai", + "cn-shenzhen", + "cn-hongkong", + "ap-southeast-1", + ] + for region in known_regions: + config = Config( + access_key_id="ak", + access_key_secret="sk", + region_id=region, + ) + api = ControlAPI(config=config) + + mock_client = MagicMock() + mock_client_class.return_value = mock_client + + api._get_gpdb_client() + + call_args = mock_client_class.call_args + config_arg = call_args[0][0] + assert ( + config_arg.endpoint == "gpdb.aliyuncs.com" + ), f"Region {region} should use gpdb.aliyuncs.com" diff --git a/tests/unittests/utils/test_data_api.py b/tests/unittests/utils/test_data_api.py index 7285bf2..9534147 100644 --- a/tests/unittests/utils/test_data_api.py +++ b/tests/unittests/utils/test_data_api.py @@ -47,21 +47,14 @@ def test_init_basic(self): assert api.resource_name == "test-resource" assert api.resource_type == ResourceType.Runtime assert api.namespace == "agents" - assert api.access_token is None - - def test_init_with_token_in_config(self): - """测试使用 config 中的 token 初始化""" - config = Config(token="my-token", account_id="test-account") - api = DataAPI( - resource_name="test-resource", - resource_type=ResourceType.Runtime, - config=config, - ) - assert api.access_token == "my-token" def test_init_with_custom_namespace(self): """测试自定义 namespace""" - config = Config(account_id="test-account") + config = Config( + account_id="test-account", + access_key_id="", + access_key_secret="", + ) api = DataAPI( resource_name="test-resource", resource_type=ResourceType.Runtime, @@ -165,21 +158,34 @@ def test_path_with_list_query_value(self): class TestDataAPIAuth: - """测试 DataAPI.auth""" + """测试 DataAPI.auth(仅 RAM 签名鉴权)""" - def test_auth_with_existing_token(self): - """测试已有 token 的认证""" - config = Config(token="my-token", account_id="test-account") + def test_auth_without_ak_sk_returns_no_auth_header(self): + """无 AK/SK 时 auth 不添加鉴权头""" + config = Config( + account_id="test-account", + access_key_id="", + access_key_secret="", + ) api = DataAPI( resource_name="test", resource_type=ResourceType.Runtime, config=config, ) url, headers, query = api.auth("https://example.com", {}, None) - assert headers["Agentrun-Access-Token"] == "my-token" - - def test_auth_fetches_token_on_demand(self): - """测试按需获取 token""" + assert "Agentrun-Access-Token" not in headers + assert "Agentrun-Authorization" not in headers + + @patch("agentrun.utils.data_api.get_agentrun_signed_headers") + def test_auth_uses_ram_signature_when_ak_sk_provided( + self, mock_signed_headers + ): + """测试配置了 AK/SK 且无 token 时使用 RAM 签名鉴权""" + mock_signed_headers.return_value = { + "Agentrun-Authorization": "mock-sig", + "x-acs-date": "2025-01-01T00:00:00Z", + "x-acs-content-sha256": "UNSIGNED-PAYLOAD", + } config = Config( access_key_id="ak", access_key_secret="sk", @@ -191,19 +197,22 @@ def test_auth_fetches_token_on_demand(self): config=config, ) - # Mock the token fetch - ControlAPI is imported inside the auth method - with patch("agentrun.utils.control_api.ControlAPI") as mock_control: - mock_client = MagicMock() - mock_response = MagicMock() - mock_response.body.data.access_token = "fetched-token" - mock_client.get_access_token.return_value = mock_response - mock_control.return_value._get_client.return_value = mock_client - - url, headers, query = api.auth("https://example.com", {}, None) - assert api.access_token == "fetched-token" - - def test_auth_handles_fetch_error(self): - """测试获取 token 失败的处理""" + url = "https://test-account-ram.agentrun-data.cn-hangzhou.aliyuncs.com/agents/resources" + url, headers, query = api.auth(url, {}, None, method="GET", body=None) + assert "Agentrun-Authorization" in headers + assert headers["Agentrun-Authorization"] == "mock-sig" + assert "x-acs-date" in headers + assert "x-acs-content-sha256" in headers + assert headers.get("x-acs-content-sha256") == "UNSIGNED-PAYLOAD" + + @patch("agentrun.utils.data_api.get_agentrun_signed_headers") + def test_auth_with_ak_sk_returns_signed_headers(self, mock_signed_headers): + """测试有 AK/SK 时 auth 返回签名头且不抛异常""" + mock_signed_headers.return_value = { + "Agentrun-Authorization": "mock-sig", + "x-acs-date": "2025-01-01T00:00:00Z", + "x-acs-content-sha256": "UNSIGNED-PAYLOAD", + } config = Config( access_key_id="ak", access_key_secret="sk", @@ -215,15 +224,14 @@ def test_auth_handles_fetch_error(self): config=config, ) - # Mock the token fetch to fail - ControlAPI is imported inside the auth method - with patch("agentrun.utils.control_api.ControlAPI") as mock_control: - mock_control.return_value._get_client.side_effect = Exception( - "Failed" - ) - - # 不应该抛出异常 - url, headers, query = api.auth("https://example.com", {}, None) - assert api.access_token is None + url, headers, query = api.auth( + "https://test-account-ram.agentrun-data.cn-hangzhou.aliyuncs.com/path", + {}, + None, + method="GET", + ) + assert "Agentrun-Authorization" in headers + assert headers["Agentrun-Authorization"] == "mock-sig" class TestDataAPIPrepareRequest: @@ -231,7 +239,11 @@ class TestDataAPIPrepareRequest: def test_prepare_request_with_dict_data(self): """测试使用字典数据准备请求""" - config = Config(token="token", account_id="test-account") + config = Config( + account_id="test-account", + access_key_id="", + access_key_secret="", + ) api = DataAPI( resource_name="test", resource_type=ResourceType.Runtime, @@ -246,7 +258,11 @@ def test_prepare_request_with_dict_data(self): def test_prepare_request_with_string_data(self): """测试使用字符串数据准备请求""" - config = Config(token="token", account_id="test-account") + config = Config( + account_id="test-account", + access_key_id="", + access_key_secret="", + ) api = DataAPI( resource_name="test", resource_type=ResourceType.Runtime, @@ -260,7 +276,11 @@ def test_prepare_request_with_string_data(self): def test_prepare_request_with_query(self): """测试带查询参数的请求准备""" - config = Config(token="token", account_id="test-account") + config = Config( + account_id="test-account", + access_key_id="", + access_key_secret="", + ) api = DataAPI( resource_name="test", resource_type=ResourceType.Runtime, @@ -273,7 +293,11 @@ def test_prepare_request_with_query(self): def test_prepare_request_with_list_query(self): """测试带多值列表查询参数的请求准备""" - config = Config(token="token", account_id="test-account") + config = Config( + account_id="test-account", + access_key_id="", + access_key_secret="", + ) api = DataAPI( resource_name="test", resource_type=ResourceType.Runtime, @@ -289,7 +313,11 @@ def test_prepare_request_with_list_query(self): def test_prepare_request_with_non_standard_data(self): """测试使用非 dict/str 类型数据准备请求""" - config = Config(token="token", account_id="test-account") + config = Config( + account_id="test-account", + access_key_id="", + access_key_secret="", + ) api = DataAPI( resource_name="test", resource_type=ResourceType.Runtime, @@ -309,7 +337,11 @@ class TestDataAPIHTTPMethods: @respx.mock def test_get(self): """测试 GET 请求""" - config = Config(token="token", account_id="test-account") + config = Config( + account_id="test-account", + access_key_id="", + access_key_secret="", + ) api = DataAPI( resource_name="test", resource_type=ResourceType.Runtime, @@ -326,7 +358,11 @@ def test_get(self): @respx.mock def test_post(self): """测试 POST 请求""" - config = Config(token="token", account_id="test-account") + config = Config( + account_id="test-account", + access_key_id="", + access_key_secret="", + ) api = DataAPI( resource_name="test", resource_type=ResourceType.Runtime, @@ -343,7 +379,11 @@ def test_post(self): @respx.mock def test_put(self): """测试 PUT 请求""" - config = Config(token="token", account_id="test-account") + config = Config( + account_id="test-account", + access_key_id="", + access_key_secret="", + ) api = DataAPI( resource_name="test", resource_type=ResourceType.Runtime, @@ -360,7 +400,11 @@ def test_put(self): @respx.mock def test_patch(self): """测试 PATCH 请求""" - config = Config(token="token", account_id="test-account") + config = Config( + account_id="test-account", + access_key_id="", + access_key_secret="", + ) api = DataAPI( resource_name="test", resource_type=ResourceType.Runtime, @@ -377,7 +421,11 @@ def test_patch(self): @respx.mock def test_delete(self): """测试 DELETE 请求""" - config = Config(token="token", account_id="test-account") + config = Config( + account_id="test-account", + access_key_id="", + access_key_secret="", + ) api = DataAPI( resource_name="test", resource_type=ResourceType.Runtime, @@ -394,7 +442,11 @@ def test_delete(self): @respx.mock def test_empty_response(self): """测试空响应""" - config = Config(token="token", account_id="test-account") + config = Config( + account_id="test-account", + access_key_id="", + access_key_secret="", + ) api = DataAPI( resource_name="test", resource_type=ResourceType.Runtime, @@ -411,7 +463,11 @@ def test_empty_response(self): @respx.mock def test_bad_gateway_error(self): """测试 502 Bad Gateway 错误""" - config = Config(token="token", account_id="test-account") + config = Config( + account_id="test-account", + access_key_id="", + access_key_secret="", + ) api = DataAPI( resource_name="test", resource_type=ResourceType.Runtime, @@ -433,7 +489,11 @@ def test_bad_gateway_error(self): @respx.mock def test_json_parse_error(self): """测试 JSON 解析错误""" - config = Config(token="token", account_id="test-account") + config = Config( + account_id="test-account", + access_key_id="", + access_key_secret="", + ) api = DataAPI( resource_name="test", resource_type=ResourceType.Runtime, @@ -451,7 +511,11 @@ def test_json_parse_error(self): @respx.mock def test_request_error(self): """测试请求错误""" - config = Config(token="token", account_id="test-account") + config = Config( + account_id="test-account", + access_key_id="", + access_key_secret="", + ) api = DataAPI( resource_name="test", resource_type=ResourceType.Runtime, @@ -474,7 +538,11 @@ class TestDataAPIAsyncMethods: @pytest.mark.asyncio async def test_get_async(self): """测试异步 GET 请求""" - config = Config(token="token", account_id="test-account") + config = Config( + account_id="test-account", + access_key_id="", + access_key_secret="", + ) api = DataAPI( resource_name="test", resource_type=ResourceType.Runtime, @@ -492,7 +560,11 @@ async def test_get_async(self): @pytest.mark.asyncio async def test_post_async(self): """测试异步 POST 请求""" - config = Config(token="token", account_id="test-account") + config = Config( + account_id="test-account", + access_key_id="", + access_key_secret="", + ) api = DataAPI( resource_name="test", resource_type=ResourceType.Runtime, @@ -510,7 +582,11 @@ async def test_post_async(self): @pytest.mark.asyncio async def test_put_async(self): """测试异步 PUT 请求""" - config = Config(token="token", account_id="test-account") + config = Config( + account_id="test-account", + access_key_id="", + access_key_secret="", + ) api = DataAPI( resource_name="test", resource_type=ResourceType.Runtime, @@ -528,7 +604,11 @@ async def test_put_async(self): @pytest.mark.asyncio async def test_patch_async(self): """测试异步 PATCH 请求""" - config = Config(token="token", account_id="test-account") + config = Config( + account_id="test-account", + access_key_id="", + access_key_secret="", + ) api = DataAPI( resource_name="test", resource_type=ResourceType.Runtime, @@ -546,7 +626,11 @@ async def test_patch_async(self): @pytest.mark.asyncio async def test_delete_async(self): """测试异步 DELETE 请求""" - config = Config(token="token", account_id="test-account") + config = Config( + account_id="test-account", + access_key_id="", + access_key_secret="", + ) api = DataAPI( resource_name="test", resource_type=ResourceType.Runtime, @@ -564,7 +648,11 @@ async def test_delete_async(self): @pytest.mark.asyncio async def test_async_empty_response(self): """测试异步空响应""" - config = Config(token="token", account_id="test-account") + config = Config( + account_id="test-account", + access_key_id="", + access_key_secret="", + ) api = DataAPI( resource_name="test", resource_type=ResourceType.Runtime, @@ -582,7 +670,11 @@ async def test_async_empty_response(self): @pytest.mark.asyncio async def test_async_request_error(self): """测试异步请求错误""" - config = Config(token="token", account_id="test-account") + config = Config( + account_id="test-account", + access_key_id="", + access_key_secret="", + ) api = DataAPI( resource_name="test", resource_type=ResourceType.Runtime, @@ -604,7 +696,11 @@ class TestDataAPIFileOperations: @respx.mock def test_post_file(self): """测试同步上传文件""" - config = Config(token="token", account_id="test-account") + config = Config( + account_id="test-account", + access_key_id="", + access_key_secret="", + ) api = DataAPI( resource_name="test", resource_type=ResourceType.Runtime, @@ -629,7 +725,11 @@ def test_post_file(self): @pytest.mark.asyncio async def test_post_file_async(self): """测试异步上传文件""" - config = Config(token="token", account_id="test-account") + config = Config( + account_id="test-account", + access_key_id="", + access_key_secret="", + ) api = DataAPI( resource_name="test", resource_type=ResourceType.Runtime, @@ -655,7 +755,11 @@ async def test_post_file_async(self): @respx.mock def test_get_file(self): """测试同步下载文件""" - config = Config(token="token", account_id="test-account") + config = Config( + account_id="test-account", + access_key_id="", + access_key_secret="", + ) api = DataAPI( resource_name="test", resource_type=ResourceType.Runtime, @@ -685,7 +789,11 @@ def test_get_file(self): @pytest.mark.asyncio async def test_get_file_async(self): """测试异步下载文件""" - config = Config(token="token", account_id="test-account") + config = Config( + account_id="test-account", + access_key_id="", + access_key_secret="", + ) api = DataAPI( resource_name="test", resource_type=ResourceType.Runtime, @@ -711,7 +819,11 @@ async def test_get_file_async(self): @respx.mock def test_get_video(self): """测试同步下载视频""" - config = Config(token="token", account_id="test-account") + config = Config( + account_id="test-account", + access_key_id="", + access_key_secret="", + ) api = DataAPI( resource_name="test", resource_type=ResourceType.Runtime, @@ -736,7 +848,11 @@ def test_get_video(self): @pytest.mark.asyncio async def test_get_video_async(self): """测试异步下载视频""" - config = Config(token="token", account_id="test-account") + config = Config( + account_id="test-account", + access_key_id="", + access_key_secret="", + ) api = DataAPI( resource_name="test", resource_type=ResourceType.Runtime, @@ -760,7 +876,11 @@ async def test_get_video_async(self): @respx.mock def test_post_file_http_error(self): """测试上传文件时的 HTTP 错误""" - config = Config(token="token", account_id="test-account") + config = Config( + account_id="test-account", + access_key_id="", + access_key_secret="", + ) api = DataAPI( resource_name="test", resource_type=ResourceType.Runtime, @@ -784,7 +904,11 @@ def test_post_file_http_error(self): @respx.mock def test_get_file_http_error(self): """测试下载文件时的 HTTP 错误""" - config = Config(token="token", account_id="test-account") + config = Config( + account_id="test-account", + access_key_id="", + access_key_secret="", + ) api = DataAPI( resource_name="test", resource_type=ResourceType.Runtime, @@ -812,7 +936,11 @@ class TestDataAPIHTTPStatusError: @respx.mock def test_http_status_error_with_response_text(self): """测试 HTTPStatusError 带响应文本""" - config = Config(token="token", account_id="test-account") + config = Config( + account_id="test-account", + access_key_id="", + access_key_secret="", + ) api = DataAPI( resource_name="test", resource_type=ResourceType.Runtime, @@ -842,7 +970,11 @@ def test_http_status_error_with_response_text(self): @pytest.mark.asyncio async def test_async_http_status_error(self): """测试异步 HTTPStatusError""" - config = Config(token="token", account_id="test-account") + config = Config( + account_id="test-account", + access_key_id="", + access_key_secret="", + ) api = DataAPI( resource_name="test", resource_type=ResourceType.Runtime, @@ -871,7 +1003,11 @@ async def test_async_http_status_error(self): @pytest.mark.asyncio async def test_async_bad_gateway_error(self): """测试异步 502 Bad Gateway 错误""" - config = Config(token="token", account_id="test-account") + config = Config( + account_id="test-account", + access_key_id="", + access_key_secret="", + ) api = DataAPI( resource_name="test", resource_type=ResourceType.Runtime, @@ -894,7 +1030,11 @@ async def test_async_bad_gateway_error(self): @pytest.mark.asyncio async def test_async_json_parse_error(self): """测试异步 JSON 解析错误""" - config = Config(token="token", account_id="test-account") + config = Config( + account_id="test-account", + access_key_id="", + access_key_secret="", + ) api = DataAPI( resource_name="test", resource_type=ResourceType.Runtime, @@ -917,7 +1057,11 @@ class TestDataAPIFileOperationsErrors: @pytest.mark.asyncio async def test_post_file_async_http_error(self): """测试异步上传文件时的 HTTP 错误""" - config = Config(token="token", account_id="test-account") + config = Config( + account_id="test-account", + access_key_id="", + access_key_secret="", + ) api = DataAPI( resource_name="test", resource_type=ResourceType.Runtime, @@ -944,7 +1088,11 @@ async def test_post_file_async_http_error(self): @pytest.mark.asyncio async def test_get_file_async_http_error(self): """测试异步下载文件时的 HTTP 错误""" - config = Config(token="token", account_id="test-account") + config = Config( + account_id="test-account", + access_key_id="", + access_key_secret="", + ) api = DataAPI( resource_name="test", resource_type=ResourceType.Runtime, @@ -968,7 +1116,11 @@ async def test_get_file_async_http_error(self): @respx.mock def test_get_video_http_error(self): """测试同步下载视频时的 HTTP 错误""" - config = Config(token="token", account_id="test-account") + config = Config( + account_id="test-account", + access_key_id="", + access_key_secret="", + ) api = DataAPI( resource_name="test", resource_type=ResourceType.Runtime, @@ -993,7 +1145,11 @@ def test_get_video_http_error(self): @pytest.mark.asyncio async def test_get_video_async_http_error(self): """测试异步下载视频时的 HTTP 错误""" - config = Config(token="token", account_id="test-account") + config = Config( + account_id="test-account", + access_key_id="", + access_key_secret="", + ) api = DataAPI( resource_name="test", resource_type=ResourceType.Runtime, @@ -1016,10 +1172,18 @@ async def test_get_video_async_http_error(self): class TestDataAPIAuthWithSandbox: - """测试 DataAPI 针对 Sandbox 资源类型的认证""" - - def test_auth_with_sandbox_resource_type(self): - """测试 Sandbox 资源类型使用 resource_id""" + """测试 DataAPI 针对 Sandbox 资源类型的认证(RAM 鉴权下与其它资源类型一致)""" + + @patch("agentrun.utils.data_api.get_agentrun_signed_headers") + def test_auth_with_sandbox_uses_ram_when_ak_sk_provided( + self, mock_signed_headers + ): + """测试 Sandbox 资源类型在配置 AK/SK 时同样使用 RAM 签名""" + mock_signed_headers.return_value = { + "Agentrun-Authorization": "mock-sig", + "x-acs-date": "2025-01-01T00:00:00Z", + "x-acs-content-sha256": "UNSIGNED-PAYLOAD", + } config = Config( access_key_id="ak", access_key_secret="sk", @@ -1031,17 +1195,14 @@ def test_auth_with_sandbox_resource_type(self): config=config, ) - # Mock the token fetch - ControlAPI is imported inside the auth method - with patch("agentrun.utils.control_api.ControlAPI") as mock_control: - mock_client = MagicMock() - mock_response = MagicMock() - mock_response.body.data.access_token = "sandbox-token" - mock_client.get_access_token.return_value = mock_response - mock_control.return_value._get_client.return_value = mock_client - - url, headers, query = api.auth("https://example.com", {}, None) - - # 验证调用使用了 resource_id 而不是 resource_name - call_args = mock_client.get_access_token.call_args - request_obj = call_args[0][0] - assert hasattr(request_obj, "resource_id") + url, headers, query = api.auth( + "https://test-account-ram.agentrun-data.cn-hangzhou.aliyuncs.com/sandboxes/sandbox-123/health", + {}, + None, + method="GET", + ) + assert "Agentrun-Authorization" in headers + assert ( + api.get_base_url().startswith("https://") + and "-ram." in api.get_base_url() + ) diff --git a/tests/unittests/utils/test_exception.py b/tests/unittests/utils/test_exception.py index dc509e8..a08c36c 100644 --- a/tests/unittests/utils/test_exception.py +++ b/tests/unittests/utils/test_exception.py @@ -4,6 +4,7 @@ from agentrun.utils.exception import ( AgentRunError, + BrowserToolError, ClientError, DeleteResourceError, HTTPError, @@ -216,3 +217,29 @@ def test_init_with_message(self): result = str(error) assert "Failed to delete resource" in result assert "Resource is locked" in result + + +class TestBrowserToolError: + """测试 BrowserToolError 异常类""" + + def test_init_without_operation(self): + """测试不带 operation 的初始化""" + error = BrowserToolError("Element not found") + assert str(error) == "Element not found" + assert error.operation is None + + def test_init_with_operation(self): + """测试带 operation 的初始化""" + error = BrowserToolError("Element not found", operation="click") + assert "click" in str(error) + assert "Element not found" in str(error) + assert error.operation == "click" + + +class TestAgentRunErrorEdgeCases: + """测试 AgentRunError 边界情况""" + + def test_init_with_empty_message(self): + """测试空消息的初始化""" + error = AgentRunError("") + assert error.message == ""