From 44489a6bf313c86ade97059bbf63597790aa148c Mon Sep 17 00:00:00 2001 From: OhYee Date: Mon, 9 Mar 2026 12:21:26 +0800 Subject: [PATCH 1/4] feat(auth): migrate from token-based to RAM signature authentication MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Migrate authentication mechanism from token-based approach to RAM signature authentication for enhanced security. The changes replace the existing access token system with Alibaba Cloud's Resource Access Management (RAM) signature verification, requiring users to configure access key ID and secret for API calls. Updates include implementation of RAM signature generation, modification of authentication endpoints with '-ram' suffix, and comprehensive updates to all API clients across runtime, sandbox, and toolset modules. This migration affects all API interactions within the SDK, including data API operations, browser automation endpoints, and OpenAI integration points. The new authentication system provides improved security through signature-based verification while maintaining backward compatibility for existing configurations. The changes also include updates to test suites to validate the new authentication flow and ensure proper handling of both authenticated and unauthenticated requests. 将身份验证机制从基于令牌的方式迁移到 RAM 签名身份验证,以提高安全性。更改将现有的访问令牌系统替换为阿里云资源访问管理 (RAM) 签名验证,要求用户为 API 调用配置访问密钥 ID 和密钥。更新包括实现 RAM 签名生成、使用 '-ram' 后缀修改身份验证端点,以及对运行时、沙箱和工具集模块中的所有 API 客户端进行全面更新。 此迁移会影响 SDK 内的所有 API 交互,包括数据 API 操作、浏览器自动化端点和 OpenAI 集成点。新的身份验证系统通过基于签名的验证提供增强的安全性,同时保持现有配置的向后兼容性。 这些更改还包括更新测试套件以验证新的身份验证流程,并确保正确处理经过身份验证和未经身份验证的请求。 Change-Id: I2585151e1acab0f476d9ba9ed909aabd212e5f2c Signed-off-by: OhYee --- .../agent_runtime/__runtime_async_template.py | 15 +- .../api/__data_async_template.py | 11 +- agentrun/agent_runtime/api/data.py | 22 +- agentrun/agent_runtime/runtime.py | 28 +- .../sandbox/api/__aio_data_async_template.py | 16 +- .../api/__browser_data_async_template.py | 16 +- agentrun/sandbox/api/aio_data.py | 16 +- agentrun/sandbox/api/browser_data.py | 16 +- agentrun/toolset/__toolset_async_template.py | 17 +- agentrun/toolset/api/openapi.py | 3 + agentrun/toolset/toolset.py | 17 +- agentrun/utils/__data_api_async_template.py | 155 ++++---- agentrun/utils/data_api.py | 186 ++++----- agentrun/utils/ram_signature/README.md | 62 +++ agentrun/utils/ram_signature/__init__.py | 5 + agentrun/utils/ram_signature/signer.py | 274 ++++++++++++++ tests/unittests/ram_signature/__init__.py | 1 + tests/unittests/ram_signature/test_signer.py | 292 ++++++++++++++ tests/unittests/utils/test_data_api.py | 355 +++++++++++++----- 19 files changed, 1198 insertions(+), 309 deletions(-) create mode 100644 agentrun/utils/ram_signature/README.md create mode 100644 agentrun/utils/ram_signature/__init__.py create mode 100644 agentrun/utils/ram_signature/signer.py create mode 100644 tests/unittests/ram_signature/__init__.py create mode 100644 tests/unittests/ram_signature/test_signer.py diff --git a/agentrun/agent_runtime/__runtime_async_template.py b/agentrun/agent_runtime/__runtime_async_template.py index 1726a1b..80cd028 100644 --- a/agentrun/agent_runtime/__runtime_async_template.py +++ b/agentrun/agent_runtime/__runtime_async_template.py @@ -43,6 +43,8 @@ class AgentRuntime( delete, update, query, and endpoint/version management. """ + _data_api: Dict[str, AgentRuntimeDataAPI] = {} + @classmethod def __get_client(cls): """获取客户端实例 / Get client instance @@ -462,16 +464,19 @@ async def invoke_openai_async( cfg = Config.with_configs(self._config, kwargs.get("config")) kwargs["config"] = cfg - if not self.__data_api: - self.__data_api: Dict[str, AgentRuntimeDataAPI] = {} + if not self._data_api: + self._data_api: Dict[str, AgentRuntimeDataAPI] = {} - if self.__data_api[agent_runtime_endpoint_name] is None: - self.__data_api[agent_runtime_endpoint_name] = AgentRuntimeDataAPI( + if ( + agent_runtime_endpoint_name not in self._data_api + or self._data_api[agent_runtime_endpoint_name] is None + ): + self._data_api[agent_runtime_endpoint_name] = AgentRuntimeDataAPI( agent_runtime_name=self.agent_runtime_name or "", agent_runtime_endpoint_name=agent_runtime_endpoint_name or "", config=cfg, ) - return await self.__data_api[ + return await self._data_api[ agent_runtime_endpoint_name ].invoke_openai_async(**kwargs) diff --git a/agentrun/agent_runtime/api/__data_async_template.py b/agentrun/agent_runtime/api/__data_async_template.py index b801b1f..5ff2525 100644 --- a/agentrun/agent_runtime/api/__data_async_template.py +++ b/agentrun/agent_runtime/api/__data_async_template.py @@ -37,8 +37,15 @@ async def invoke_openai_async( config = kwargs.get("config", None) cfg = Config.with_configs(self.config, config) - api_base = self.with_path("openai/v1") - _, headers, _ = self.auth(headers=cfg.get_headers()) + api_base = self.with_path("openai/v1", config=cfg) + # Sign the actual request URL (OpenAI client will POST to base + /chat/completions) + chat_completions_url = api_base.rstrip("/") + "/chat/completions" + _, headers, _ = self.auth( + url=chat_completions_url, + headers=cfg.get_headers(), + config=cfg, + method="POST", + ) from httpx import AsyncClient from openai import AsyncOpenAI diff --git a/agentrun/agent_runtime/api/data.py b/agentrun/agent_runtime/api/data.py index f3b5226..465db02 100644 --- a/agentrun/agent_runtime/api/data.py +++ b/agentrun/agent_runtime/api/data.py @@ -48,8 +48,15 @@ async def invoke_openai_async( config = kwargs.get("config", None) cfg = Config.with_configs(self.config, config) - api_base = self.with_path("openai/v1") - _, headers, _ = self.auth(headers=cfg.get_headers()) + api_base = self.with_path("openai/v1", config=cfg) + # Sign the actual request URL (OpenAI client will POST to base + /chat/completions) + chat_completions_url = api_base.rstrip("/") + "/chat/completions" + _, headers, _ = self.auth( + url=chat_completions_url, + headers=cfg.get_headers(), + config=cfg, + method="POST", + ) from httpx import AsyncClient from openai import AsyncOpenAI @@ -77,8 +84,15 @@ def invoke_openai( config = kwargs.get("config", None) cfg = Config.with_configs(self.config, config) - api_base = self.with_path("openai/v1") - _, headers, _ = self.auth(headers=cfg.get_headers()) + api_base = self.with_path("openai/v1", config=cfg) + # Sign the actual request URL (OpenAI client will POST to base + /chat/completions) + chat_completions_url = api_base.rstrip("/") + "/chat/completions" + _, headers, _ = self.auth( + url=chat_completions_url, + headers=cfg.get_headers(), + config=cfg, + method="POST", + ) from httpx import Client from openai import OpenAI diff --git a/agentrun/agent_runtime/runtime.py b/agentrun/agent_runtime/runtime.py index af2dde8..9d177f6 100644 --- a/agentrun/agent_runtime/runtime.py +++ b/agentrun/agent_runtime/runtime.py @@ -53,6 +53,8 @@ class AgentRuntime( delete, update, query, and endpoint/version management. """ + _data_api: Dict[str, AgentRuntimeDataAPI] = {} + @classmethod def __get_client(cls): """获取客户端实例 / Get client instance @@ -867,17 +869,20 @@ async def invoke_openai_async( cfg = Config.with_configs(self._config, kwargs.get("config")) kwargs["config"] = cfg - if not self.__data_api: - self.__data_api: Dict[str, AgentRuntimeDataAPI] = {} + if not self._data_api: + self._data_api: Dict[str, AgentRuntimeDataAPI] = {} - if self.__data_api[agent_runtime_endpoint_name] is None: - self.__data_api[agent_runtime_endpoint_name] = AgentRuntimeDataAPI( + if ( + agent_runtime_endpoint_name in self._data_api + and self._data_api[agent_runtime_endpoint_name] is None + ): + self._data_api[agent_runtime_endpoint_name] = AgentRuntimeDataAPI( agent_runtime_name=self.agent_runtime_name or "", agent_runtime_endpoint_name=agent_runtime_endpoint_name or "", config=cfg, ) - return await self.__data_api[ + return await self._data_api[ agent_runtime_endpoint_name ].invoke_openai_async(**kwargs) @@ -889,16 +894,19 @@ def invoke_openai( cfg = Config.with_configs(self._config, kwargs.get("config")) kwargs["config"] = cfg - if not self.__data_api: - self.__data_api: Dict[str, AgentRuntimeDataAPI] = {} + if not self._data_api: + self._data_api: Dict[str, AgentRuntimeDataAPI] = {} - if self.__data_api[agent_runtime_endpoint_name] is None: - self.__data_api[agent_runtime_endpoint_name] = AgentRuntimeDataAPI( + if ( + agent_runtime_endpoint_name not in self._data_api + or self._data_api[agent_runtime_endpoint_name] is None + ): + self._data_api[agent_runtime_endpoint_name] = AgentRuntimeDataAPI( agent_runtime_name=self.agent_runtime_name or "", agent_runtime_endpoint_name=agent_runtime_endpoint_name or "", config=cfg, ) - return self.__data_api[agent_runtime_endpoint_name].invoke_openai( + return self._data_api[agent_runtime_endpoint_name].invoke_openai( **kwargs ) diff --git a/agentrun/sandbox/api/__aio_data_async_template.py b/agentrun/sandbox/api/__aio_data_async_template.py index b27311a..129ee38 100644 --- a/agentrun/sandbox/api/__aio_data_async_template.py +++ b/agentrun/sandbox/api/__aio_data_async_template.py @@ -103,9 +103,13 @@ def sync_playwright( from .playwright_sync import BrowserPlaywrightSync cfg = Config.with_configs(self.config, config) - _, headers, _ = self.auth(headers=cfg.get_headers(), config=cfg) + + url = self.get_cdp_url(record=record) + url, headers, _ = self.auth( + url=url, headers=cfg.get_headers(), config=cfg + ) return BrowserPlaywrightSync( - self.get_cdp_url(record=record), + url, browser_type=browser_type, headers=headers, ) @@ -119,9 +123,13 @@ def async_playwright( from .playwright_async import BrowserPlaywrightAsync cfg = Config.with_configs(self.config, config) - _, headers, _ = self.auth(headers=cfg.get_headers(), config=cfg) + + url = self.get_cdp_url(record=record) + url, headers, _ = self.auth( + url=url, headers=cfg.get_headers(), config=cfg + ) return BrowserPlaywrightAsync( - self.get_cdp_url(record=record), + url, browser_type=browser_type, headers=headers, ) diff --git a/agentrun/sandbox/api/__browser_data_async_template.py b/agentrun/sandbox/api/__browser_data_async_template.py index 16e884e..cf215b5 100644 --- a/agentrun/sandbox/api/__browser_data_async_template.py +++ b/agentrun/sandbox/api/__browser_data_async_template.py @@ -92,9 +92,13 @@ def sync_playwright( from .playwright_sync import BrowserPlaywrightSync cfg = Config.with_configs(self.config, config) - _, headers, _ = self.auth(headers=cfg.get_headers(), config=cfg) + + url = self.get_cdp_url(record=record) + url, headers, _ = self.auth( + url=url, headers=cfg.get_headers(), config=cfg + ) return BrowserPlaywrightSync( - self.get_cdp_url(record=record), + url, browser_type=browser_type, headers=headers, ) @@ -108,9 +112,13 @@ def async_playwright( from .playwright_async import BrowserPlaywrightAsync cfg = Config.with_configs(self.config, config) - _, headers, _ = self.auth(headers=cfg.get_headers(), config=cfg) + + url = self.get_cdp_url(record=record) + url, headers, _ = self.auth( + url=url, headers=cfg.get_headers(), config=cfg + ) return BrowserPlaywrightAsync( - self.get_cdp_url(record=record), + url, browser_type=browser_type, headers=headers, ) diff --git a/agentrun/sandbox/api/aio_data.py b/agentrun/sandbox/api/aio_data.py index b772e1d..d271db4 100644 --- a/agentrun/sandbox/api/aio_data.py +++ b/agentrun/sandbox/api/aio_data.py @@ -113,9 +113,13 @@ def sync_playwright( from .playwright_sync import BrowserPlaywrightSync cfg = Config.with_configs(self.config, config) - _, headers, _ = self.auth(headers=cfg.get_headers(), config=cfg) + + url = self.get_cdp_url(record=record) + url, headers, _ = self.auth( + url=url, headers=cfg.get_headers(), config=cfg + ) return BrowserPlaywrightSync( - self.get_cdp_url(record=record), + url, browser_type=browser_type, headers=headers, ) @@ -129,9 +133,13 @@ def async_playwright( from .playwright_async import BrowserPlaywrightAsync cfg = Config.with_configs(self.config, config) - _, headers, _ = self.auth(headers=cfg.get_headers(), config=cfg) + + url = self.get_cdp_url(record=record) + url, headers, _ = self.auth( + url=url, headers=cfg.get_headers(), config=cfg + ) return BrowserPlaywrightAsync( - self.get_cdp_url(record=record), + url, browser_type=browser_type, headers=headers, ) diff --git a/agentrun/sandbox/api/browser_data.py b/agentrun/sandbox/api/browser_data.py index 50709f5..2523605 100644 --- a/agentrun/sandbox/api/browser_data.py +++ b/agentrun/sandbox/api/browser_data.py @@ -102,9 +102,13 @@ def sync_playwright( from .playwright_sync import BrowserPlaywrightSync cfg = Config.with_configs(self.config, config) - _, headers, _ = self.auth(headers=cfg.get_headers(), config=cfg) + + url = self.get_cdp_url(record=record) + url, headers, _ = self.auth( + url=url, headers=cfg.get_headers(), config=cfg + ) return BrowserPlaywrightSync( - self.get_cdp_url(record=record), + url, browser_type=browser_type, headers=headers, ) @@ -118,9 +122,13 @@ def async_playwright( from .playwright_async import BrowserPlaywrightAsync cfg = Config.with_configs(self.config, config) - _, headers, _ = self.auth(headers=cfg.get_headers(), config=cfg) + + url = self.get_cdp_url(record=record) + url, headers, _ = self.auth( + url=url, headers=cfg.get_headers(), config=cfg + ) return BrowserPlaywrightAsync( - self.get_cdp_url(record=record), + url, browser_type=browser_type, headers=headers, ) diff --git a/agentrun/toolset/__toolset_async_template.py b/agentrun/toolset/__toolset_async_template.py index aec1d04..87cc9e0 100644 --- a/agentrun/toolset/__toolset_async_template.py +++ b/agentrun/toolset/__toolset_async_template.py @@ -94,11 +94,18 @@ def _get_openapi_auth_defaults( return headers, query def _get_openapi_base_url(self) -> Optional[str]: - return pydash.get( - self, - "status.outputs.urls.intranet_url", - None, - ) or pydash.get(self, "status.outputs.urls.internet_url", None) + import os + + fc_region = os.getenv("FC_REGION") + arn = pydash.get(self, "status.outputs.function_arn", "") + + if fc_region and arn and pydash.get(arn.split(":"), "[2]"): + # 在同一个 region,则使用内网地址 + return pydash.get( + self, + "status.outputs.urls.intranet_url", + None, + ) async def get_async(self, config: Optional[Config] = None): if self.name is None: diff --git a/agentrun/toolset/api/openapi.py b/agentrun/toolset/api/openapi.py index df0198f..45e5d83 100644 --- a/agentrun/toolset/api/openapi.py +++ b/agentrun/toolset/api/openapi.py @@ -829,6 +829,8 @@ def invoke_tool( request_kwargs, timeout, raise_for_status = self._prepare_request( name, arguments, config ) + + print(request_kwargs) with httpx.Client(timeout=timeout) as client: response = client.request(**request_kwargs) if raise_for_status: @@ -943,6 +945,7 @@ def _walk(node: Any): def _extract_base_url(self, schema: Dict[str, Any]) -> Optional[str]: servers = schema.get("servers") or [] + print("======", servers) return self._pick_server_url(servers) def _convert_to_native(self, value: Any) -> Any: diff --git a/agentrun/toolset/toolset.py b/agentrun/toolset/toolset.py index bd417fb..965969b 100644 --- a/agentrun/toolset/toolset.py +++ b/agentrun/toolset/toolset.py @@ -109,11 +109,18 @@ def _get_openapi_auth_defaults( return headers, query def _get_openapi_base_url(self) -> Optional[str]: - return pydash.get( - self, - "status.outputs.urls.intranet_url", - None, - ) or pydash.get(self, "status.outputs.urls.internet_url", None) + import os + + fc_region = os.getenv("FC_REGION") + arn = pydash.get(self, "status.outputs.function_arn", "") + + if fc_region and arn and pydash.get(arn.split(":"), "[2]"): + # 在同一个 region,则使用内网地址 + return pydash.get( + self, + "status.outputs.urls.intranet_url", + None, + ) async def get_async(self, config: Optional[Config] = None): if self.name is None: diff --git a/agentrun/utils/__data_api_async_template.py b/agentrun/utils/__data_api_async_template.py index c664f7f..e6279f5 100644 --- a/agentrun/utils/__data_api_async_template.py +++ b/agentrun/utils/__data_api_async_template.py @@ -8,6 +8,7 @@ """ from enum import Enum +import json from typing import Any, Dict, Optional, Union from urllib.parse import parse_qs, urlencode, urlparse, urlunparse @@ -15,8 +16,8 @@ from agentrun.utils.config import Config from agentrun.utils.exception import ClientError -from agentrun.utils.helper import mask_password from agentrun.utils.log import logger +from agentrun.utils.ram_signature import get_agentrun_signed_headers class ResourceType(Enum): @@ -65,25 +66,45 @@ def __init__( self.resource_name = resource_name self.resource_type = resource_type - self.access_token = None self.config = Config.with_configs(config) self.namespace = namespace - if self.config.get_token(): - logger.debug( - "using provided access token from config, %s", - mask_password(self.config.get_token() or ""), - ) - self.access_token = self.config.get_token() + def _use_ram_auth(self, config: Optional[Config] = None) -> bool: + """是否使用 RAM 签名鉴权(配置了 AK/SK 时使用)。""" + cfg = Config.with_configs(self.config, config) + return bool(cfg.get_access_key_id() and cfg.get_access_key_secret()) + + def _get_ram_data_endpoint(self, config: Optional[Config] = None) -> str: + """返回 RAM 鉴权用的 data endpoint(仅当默认 agentrun-data 域名时在 host 前加 -ram)。""" + cfg = Config.with_configs(self.config, config) + base = cfg.get_data_endpoint() + parsed = urlparse(base) + if not parsed.netloc or ".agentrun-data." not in parsed.netloc: + return base + parts = parsed.netloc.split(".", 1) + if len(parts) != 2: + return base + ram_netloc = parts[0] + "-ram." + parts[1] + return urlunparse(( + parsed.scheme, + ram_netloc, + parsed.path or "", + parsed.params, + parsed.query, + parsed.fragment, + )) def get_base_url(self) -> str: """ Get the base URL for API requests. + 当使用 RAM 鉴权时返回 RAM 端点(host 带 -ram)。 Returns: The base URL string """ + if self._use_ram_auth(): + return self._get_ram_data_endpoint() return self.config.get_data_endpoint() def with_path( @@ -166,81 +187,40 @@ def auth( headers: Optional[Dict[str, str]] = None, query: Optional[Dict[str, Any]] = None, config: Optional[Config] = None, + method: str = "GET", + body: Optional[bytes] = None, ) -> tuple[str, Dict[str, str], Optional[Dict[str, Any]]]: """ - Authentication hook for modifying requests before sending. - - This method can be overridden in subclasses to implement custom - authentication logic (e.g., signing requests, adding auth tokens). - - Args: - url: The request URL - headers: The request headers - query: The query parameters - - Returns: - Tuple of (modified_url, modified_headers, modified_query) - - Examples: - Override this method to add custom authentication: - - >>> class AuthedClient(AgentRunDataClient): - ... def auth(self, url, headers, query): - ... # Add auth token to headers - ... headers["Authorization"] = "Bearer token123" - ... # Or add signature to query - ... query = query or {} - ... query["signature"] = self._sign_request(url) - ... return url, headers, query + Authentication: 仅使用 RAM 签名鉴权(Agentrun-Authorization)。需在 config 中配置 AK/SK。 """ cfg = Config.with_configs(self.config, config) - if ( - self.access_token is None - and self.resource_name - and self.resource_type - and not cfg.get_token() - ): + if self._use_ram_auth(cfg): try: - from alibabacloud_agentrun20250910.models import ( - GetAccessTokenRequest, + signed = get_agentrun_signed_headers( + url=url, + method=method, + access_key_id=cfg.get_access_key_id(), + access_key_secret=cfg.get_access_key_secret(), + security_token=cfg.get_security_token() or None, + region=cfg.get_region_id(), + product="agentrun", + body=body, ) - - from .control_api import ControlAPI - - cli = ControlAPI(self.config)._get_client() - input = ( - GetAccessTokenRequest( - resource_id=self.resource_name, - resource_type=self.resource_type.value, - ) - if self.resource_type == ResourceType.Sandbox - else GetAccessTokenRequest( - resource_name=self.resource_name, - resource_type=self.resource_type.value, - ) - ) - - resp = cli.get_access_token(input) - self.access_token = resp.body.data.access_token - - except Exception as e: - logger.warning( - "Failed to get access token for" - f" {self.resource_type}({self.resource_name}): {e}" + headers = { + **signed, + **cfg.get_headers(), + **(headers or {}), + } + logger.debug( + "using RAM signature for data API request to %s", + url[:80] + "..." if len(url) > 80 else url, ) - - logger.debug( - "fetching access token for resource %s of type %s, %s", - self.resource_name, - self.resource_type, - mask_password(self.access_token or ""), - ) - headers = { - "Agentrun-Access-Token": cfg.get_token() or self.access_token or "", - **cfg.get_headers(), - **(headers or {}), - } + except ValueError as e: + logger.warning("RAM signing skipped (missing AK/SK): %s", e) + headers = {**cfg.get_headers(), **(headers or {})} + else: + headers = {**cfg.get_headers(), **(headers or {})} return url, headers, query @@ -267,8 +247,19 @@ def _prepare_request( if headers: req_headers.update(headers) + body_bytes: Optional[bytes] = None + if data is not None: + if isinstance(data, dict): + body_bytes = json.dumps(data).encode("utf-8") + elif isinstance(data, str): + body_bytes = data.encode("utf-8") + else: + body_bytes = str(data).encode("utf-8") + # Apply authentication (may modify URL, headers, and query) - url, req_headers, query = self.auth(url, req_headers, query, config=cfg) + url, req_headers, query = self.auth( + url, req_headers, query, config=cfg, method=method, body=body_bytes + ) # Add query parameters to URL if provided if query: @@ -606,7 +597,9 @@ async def post_file_async( req_headers.update(headers or {}) # Apply authentication (may modify URL, headers, and query) cfg = Config.with_configs(self.config, config) - url, req_headers, query = self.auth(url, req_headers, query, config=cfg) + url, req_headers, query = self.auth( + url, req_headers, query, config=cfg, method="POST" + ) try: with open(local_file_path, "rb") as f: @@ -659,7 +652,9 @@ async def get_file_async( req_headers.update(headers or {}) # Apply authentication (may modify URL, headers, and query) cfg = Config.with_configs(self.config, config) - url, req_headers, query = self.auth(url, req_headers, query, config=cfg) + url, req_headers, query = self.auth( + url, req_headers, query, config=cfg, method="GET" + ) try: async with httpx.AsyncClient( @@ -708,7 +703,9 @@ async def get_video_async( req_headers.update(headers or {}) # Apply authentication (may modify URL, headers, and query) cfg = Config.with_configs(self.config, config) - url, req_headers, query = self.auth(url, req_headers, query, config=cfg) + url, req_headers, query = self.auth( + url, req_headers, query, config=cfg, method="GET" + ) try: async with httpx.AsyncClient( diff --git a/agentrun/utils/data_api.py b/agentrun/utils/data_api.py index ba9b496..ad8326b 100644 --- a/agentrun/utils/data_api.py +++ b/agentrun/utils/data_api.py @@ -18,6 +18,7 @@ """ from enum import Enum +import json from typing import Any, Dict, Optional, Union from urllib.parse import parse_qs, urlencode, urlparse, urlunparse @@ -25,8 +26,8 @@ from agentrun.utils.config import Config from agentrun.utils.exception import ClientError -from agentrun.utils.helper import mask_password from agentrun.utils.log import logger +from agentrun.utils.ram_signature import get_agentrun_signed_headers class ResourceType(Enum): @@ -75,29 +76,56 @@ def __init__( self.resource_name = resource_name self.resource_type = resource_type - self.access_token = None self.config = Config.with_configs(config) self.namespace = namespace - if self.config.get_token(): - logger.debug( - "using provided access token from config, %s", - mask_password(self.config.get_token() or ""), - ) - self.access_token = self.config.get_token() + def _use_ram_auth(self, config: Optional[Config] = None) -> bool: + """是否使用 RAM 签名鉴权(配置了 AK/SK 时使用)。""" + cfg = Config.with_configs(self.config, config) + return bool(cfg.get_access_key_id() and cfg.get_access_key_secret()) + + def _get_ram_data_endpoint(self, config: Optional[Config] = None) -> str: + """返回 RAM 鉴权用的 data endpoint(仅当默认 agentrun-data 域名时在 host 前加 -ram)。""" + cfg = Config.with_configs(self.config, config) + base = cfg.get_data_endpoint() + parsed = urlparse(base) + if not parsed.netloc or ".agentrun-data." not in parsed.netloc: + return base + parts = parsed.netloc.split(".", 1) + if len(parts) != 2: + return base + ram_netloc = parts[0] + "-ram." + parts[1] + return urlunparse(( + parsed.scheme, + ram_netloc, + parsed.path or "", + parsed.params, + parsed.query, + parsed.fragment, + )) - def get_base_url(self) -> str: + def get_base_url(self, config: Optional[Config] = None) -> str: """ Get the base URL for API requests. + 当使用 RAM 鉴权时返回 RAM 端点(host 带 -ram)。 + + Args: + config: 可选,用于计算 base URL 的配置;未传时使用 self.config。 Returns: The base URL string """ - return self.config.get_data_endpoint() + cfg = Config.with_configs(self.config, config) + if self._use_ram_auth(cfg): + return self._get_ram_data_endpoint(cfg) + return cfg.get_data_endpoint() def with_path( - self, path: str, query: Optional[Dict[str, Any]] = None + self, + path: str, + query: Optional[Dict[str, Any]] = None, + config: Optional[Config] = None, ) -> str: """ Construct full URL with the given path and query parameters. @@ -105,6 +133,7 @@ def with_path( Args: path: API path (may include query string) query: Query parameters to add/merge + config: 可选,用于计算 base URL 的配置;未传时使用 self.config。 Returns: Complete URL string with query parameters @@ -124,7 +153,7 @@ def with_path( base_url = "/".join([ part.strip("/") for part in [ - self.get_base_url(), + self.get_base_url(config), self.namespace, path, ] @@ -176,81 +205,42 @@ def auth( headers: Optional[Dict[str, str]] = None, query: Optional[Dict[str, Any]] = None, config: Optional[Config] = None, + method: str = "GET", + body: Optional[bytes] = None, ) -> tuple[str, Dict[str, str], Optional[Dict[str, Any]]]: """ - Authentication hook for modifying requests before sending. - - This method can be overridden in subclasses to implement custom - authentication logic (e.g., signing requests, adding auth tokens). - - Args: - url: The request URL - headers: The request headers - query: The query parameters - - Returns: - Tuple of (modified_url, modified_headers, modified_query) - - Examples: - Override this method to add custom authentication: - - >>> class AuthedClient(AgentRunDataClient): - ... def auth(self, url, headers, query): - ... # Add auth token to headers - ... headers["Authorization"] = "Bearer token123" - ... # Or add signature to query - ... query = query or {} - ... query["signature"] = self._sign_request(url) - ... return url, headers, query + Authentication: 仅使用 RAM 签名鉴权(Agentrun-Authorization)。 + 需在 config 中配置 AK/SK。 """ cfg = Config.with_configs(self.config, config) - if ( - self.access_token is None - and self.resource_name - and self.resource_type - and not cfg.get_token() - ): + if self._use_ram_auth(cfg): try: - from alibabacloud_agentrun20250910.models import ( - GetAccessTokenRequest, + signed = get_agentrun_signed_headers( + url=url, + method=method, + access_key_id=cfg.get_access_key_id(), + access_key_secret=cfg.get_access_key_secret(), + security_token=cfg.get_security_token() or None, + region=cfg.get_region_id(), + product="agentrun", + body=body, ) - - from .control_api import ControlAPI - - cli = ControlAPI(self.config)._get_client() - input = ( - GetAccessTokenRequest( - resource_id=self.resource_name, - resource_type=self.resource_type.value, - ) - if self.resource_type == ResourceType.Sandbox - else GetAccessTokenRequest( - resource_name=self.resource_name, - resource_type=self.resource_type.value, - ) + headers = { + **signed, + **cfg.get_headers(), + **(headers or {}), + } + logger.debug( + "using RAM signature for data API request to %s, token %s", + url[:80] + "..." if len(url) > 80 else url, + signed, ) - - resp = cli.get_access_token(input) - self.access_token = resp.body.data.access_token - - except Exception as e: - logger.warning( - "Failed to get access token for" - f" {self.resource_type}({self.resource_name}): {e}" - ) - - logger.debug( - "fetching access token for resource %s of type %s, %s", - self.resource_name, - self.resource_type, - mask_password(self.access_token or ""), - ) - headers = { - "Agentrun-Access-Token": cfg.get_token() or self.access_token or "", - **cfg.get_headers(), - **(headers or {}), - } + except ValueError as e: + logger.warning("RAM signing skipped (missing AK/SK): %s", e) + headers = {**cfg.get_headers(), **(headers or {})} + else: + headers = {**cfg.get_headers(), **(headers or {})} return url, headers, query @@ -277,8 +267,20 @@ def _prepare_request( if headers: req_headers.update(headers) + # Build body bytes for RAM signing when applicable + body_bytes: Optional[bytes] = None + if data is not None: + if isinstance(data, dict): + body_bytes = json.dumps(data).encode("utf-8") + elif isinstance(data, str): + body_bytes = data.encode("utf-8") + else: + body_bytes = str(data).encode("utf-8") + # Apply authentication (may modify URL, headers, and query) - url, req_headers, query = self.auth(url, req_headers, query, config=cfg) + url, req_headers, query = self.auth( + url, req_headers, query, config=cfg, method=method, body=body_bytes + ) # Add query parameters to URL if provided if query: @@ -858,7 +860,9 @@ async def post_file_async( req_headers.update(headers or {}) # Apply authentication (may modify URL, headers, and query) cfg = Config.with_configs(self.config, config) - url, req_headers, query = self.auth(url, req_headers, query, config=cfg) + url, req_headers, query = self.auth( + url, req_headers, query, config=cfg, method="POST" + ) try: with open(local_file_path, "rb") as f: @@ -920,7 +924,9 @@ def post_file( req_headers.update(headers or {}) # Apply authentication (may modify URL, headers, and query) cfg = Config.with_configs(self.config, config) - url, req_headers, query = self.auth(url, req_headers, query, config=cfg) + url, req_headers, query = self.auth( + url, req_headers, query, config=cfg, method="POST" + ) try: with open(local_file_path, "rb") as f: @@ -971,7 +977,9 @@ async def get_file_async( req_headers.update(headers or {}) # Apply authentication (may modify URL, headers, and query) cfg = Config.with_configs(self.config, config) - url, req_headers, query = self.auth(url, req_headers, query, config=cfg) + url, req_headers, query = self.auth( + url, req_headers, query, config=cfg, method="GET" + ) try: async with httpx.AsyncClient( @@ -1020,7 +1028,9 @@ def get_file( req_headers.update(headers or {}) # Apply authentication (may modify URL, headers, and query) cfg = Config.with_configs(self.config, config) - url, req_headers, query = self.auth(url, req_headers, query, config=cfg) + url, req_headers, query = self.auth( + url, req_headers, query, config=cfg, method="GET" + ) try: with httpx.Client(timeout=self.config.get_timeout()) as client: @@ -1067,7 +1077,9 @@ async def get_video_async( req_headers.update(headers or {}) # Apply authentication (may modify URL, headers, and query) cfg = Config.with_configs(self.config, config) - url, req_headers, query = self.auth(url, req_headers, query, config=cfg) + url, req_headers, query = self.auth( + url, req_headers, query, config=cfg, method="GET" + ) try: async with httpx.AsyncClient( @@ -1116,7 +1128,9 @@ def get_video( req_headers.update(headers or {}) # Apply authentication (may modify URL, headers, and query) cfg = Config.with_configs(self.config, config) - url, req_headers, query = self.auth(url, req_headers, query, config=cfg) + url, req_headers, query = self.auth( + url, req_headers, query, config=cfg, method="GET" + ) try: with httpx.Client(timeout=self.config.get_timeout()) as client: diff --git a/agentrun/utils/ram_signature/README.md b/agentrun/utils/ram_signature/README.md new file mode 100644 index 0000000..dc1ad30 --- /dev/null +++ b/agentrun/utils/ram_signature/README.md @@ -0,0 +1,62 @@ +# AgentRun RAM 签名独立实现 + +本目录为 **独立文件夹实现**,**Python 手写签名逻辑**,无 `alibabacloud_signer_inner` 等外部签名库依赖。 +**Node.js 版本** 位于 [agentrun-sdk-nodejs](../agentrun-sdk-nodejs) 的 `src/utils/ram-signature/` 目录。 + +## 目录结构(Python SDK) + +``` +ram_signature/ +├── README.md # 本说明 +├── __init__.py # Python 包入口,导出 get_agentrun_signed_headers +└── python/ # Python 手写实现 + ├── __init__.py + └── signer.py # AGENTRUN4-HMAC-SHA256 纯手写 +``` + +## 算法说明 + +- **算法名**: `AGENTRUN4-HMAC-SHA256` +- **Payload**: `UNSIGNED-PAYLOAD`(不参与 body 哈希) +- **参与签名的头**: `host`, `x-acs-date`, `x-acs-content-sha256`,可选 `x-acs-security-token` +- **流程**: Canonical Request → StringToSign → HMAC 派生 Key → Signature,与阿里云 OSS V4 / ACS 风格一致 + +## Python 使用 + +在 AgentRun Python SDK 内已通过 `agentrun.utils.ram_signature` 或 `agentrun.ram_signature` 使用: + +```python +from agentrun.ram_signature import get_agentrun_signed_headers + +headers = get_agentrun_signed_headers( + url="https://xxx-ram.agentrun-data.cn-hangzhou.aliyuncs.com/path", + method="GET", + access_key_id="ak", + access_key_secret="sk", + region="cn-hangzhou", + product="agentrun", +) +# headers["Agentrun-Authorization"], headers["x-acs-date"], ... +``` + +## Node.js 使用 + +Node.js 实现位于 **agentrun-sdk-nodejs** 仓库的 `src/utils/ram-signature/` 目录,通过 `@agentrun/sdk` 的 utils 导出: + +```typescript +import { getAgentrunSignedHeaders } from '@agentrun/sdk'; + +const headers = getAgentrunSignedHeaders({ + url: 'https://xxx-ram.agentrun-data.cn-hangzhou.aliyuncs.com/path', + method: 'GET', + accessKeyId: 'ak', + accessKeySecret: 'sk', + region: 'cn-hangzhou', + product: 'agentrun', +}); +// headers['Agentrun-Authorization'], headers['x-acs-date'], ... +``` + +## 与 ram-e2e-test 的对应关系 + +逻辑与 ram-e2e-test/signature_helper.py 一致,用于替代原 `GetAccessToken` + `Agentrun-Access-Token` 的 Data API 鉴权方式;此处为手写实现,不依赖 `alibabacloud_signer_inner`。 diff --git a/agentrun/utils/ram_signature/__init__.py b/agentrun/utils/ram_signature/__init__.py new file mode 100644 index 0000000..0102f3f --- /dev/null +++ b/agentrun/utils/ram_signature/__init__.py @@ -0,0 +1,5 @@ +"""Python 手写实现:AGENTRUN4-HMAC-SHA256,无 alibabacloud_signer_inner 依赖。""" + +from .signer import get_agentrun_signed_headers + +__all__ = ["get_agentrun_signed_headers"] diff --git a/agentrun/utils/ram_signature/signer.py b/agentrun/utils/ram_signature/signer.py new file mode 100644 index 0000000..301716f --- /dev/null +++ b/agentrun/utils/ram_signature/signer.py @@ -0,0 +1,274 @@ +""" +AgentRun RAM 签名 - Python 手写实现(无外部 signer 依赖) + +实现 AGENTRUN4-HMAC-SHA256,与 http-auth-acs / ram-e2e-test(alibabacloud_signer_inner)行为一致: +- UNSIGNED-PAYLOAD +- x-acs-date(ISO8601)、x-acs-content-sha256、host、可选 x-acs-security-token +- Canonical Request 与 http-auth-acs 一致(URI 不编码、Query 空值用 key=、仅签 x-acs-* / host / content-type) +- StringToSign 为 2 行:ALGORITHM + "\\n" + SHA256(CanonicalRequest)(无 timestamp/scope) +""" + +from datetime import datetime, timezone +import hashlib +import hmac +from typing import Optional +from urllib.parse import quote, urlparse + +ALGORITHM = "AGENTRUN4-HMAC-SHA256" +UNSIGNED_PAYLOAD = "UNSIGNED-PAYLOAD" +SCOPE_SUFFIX = "aliyun_v4_request" +KEY_PREFIX = "aliyun_v4" + + +def _build_scope(date: str, region: str, product: str) -> str: + return f"{date}/{region}/{product}/{SCOPE_SUFFIX}" + + +def _percent_encode(value: Optional[str]) -> str: + """与 http-auth-acs 一致:quote(safe='') 后把 %7E 还原为 ~。""" + if value is None: + return "" + return quote(str(value), safe="").replace("%7E", "~") + + +def _canonical_uri(path: str) -> str: + """与 http-auth-acs 一致:path 原样,不 percent-encode。""" + if path is None or path == "": + return "/" + return path + + +def _canonical_query(query_params: dict) -> str: + """与 http-auth-acs 一致:空值为 encoded_key=(带等号)。""" + if not query_params: + return "" + parts = [] + for k in sorted(query_params.keys()): + v = query_params.get(k) + enc_k = _percent_encode(k) + if v is not None and v != "": + parts.append(f"{enc_k}={_percent_encode(v)}") + else: + parts.append(f"{enc_k}=") + return "&".join(parts) + + +def _get_signed_headers(headers: dict) -> list[str]: + """与 http-auth-acs 一致:仅签 x-acs-*、host、content-type,且 value 非 None。""" + out = set() + for key, value in headers.items(): + lower_key = key.lower().strip() + if value is not None and ( + lower_key.startswith("x-acs-") + or lower_key == "host" + or lower_key == "content-type" + ): + out.add(lower_key) + return sorted(out) + + +def _canonical_headers(headers: dict) -> tuple[str, str]: + """与 http-auth-acs 一致:先归一化再按 signed_headers 顺序输出 header:value\\n。""" + new_headers: dict[str, str] = {} + for k, v in headers.items(): + lower_key = k.lower().strip() + if v is not None: + new_headers[lower_key] = str(v).strip() + signed_list = _get_signed_headers(headers) + canonical = "".join(f"{h}:{new_headers[h]}\n" for h in signed_list) + signed_str = ";".join(signed_list) + return canonical, signed_str + + +def _calc_canonical_request( + method: str, + pathname: str, + query_params: dict, + headers: dict, + hashed_payload: str, +) -> str: + method = method.upper() + uri = _canonical_uri(pathname) + query = _canonical_query(query_params) + canon_headers, signed_headers = _canonical_headers(headers) + return f"{method}\n{uri}\n{query}\n{canon_headers}\n{signed_headers}\n{hashed_payload}" + + +def _calc_string_to_sign(canonical_request: str) -> str: + """与 http-auth-acs 一致:2 行 StringToSign = ALGORITHM + \\n + SHA256(CanonicalRequest)。""" + digest = hashlib.sha256(canonical_request.encode("utf-8")).hexdigest() + return f"{ALGORITHM}\n{digest}" + + +def _get_signing_key( + secret: str, date: str, region: str, product: str +) -> bytes: + key = (KEY_PREFIX + secret).encode("utf-8") + k_date = hmac.new(key, date.encode("utf-8"), hashlib.sha256).digest() + k_region = hmac.new(k_date, region.encode("utf-8"), hashlib.sha256).digest() + k_product = hmac.new( + k_region, product.encode("utf-8"), hashlib.sha256 + ).digest() + k_signing = hmac.new( + k_product, SCOPE_SUFFIX.encode("utf-8"), hashlib.sha256 + ).digest() + return k_signing + + +def _calc_signature( + secret: str, + date: str, + region: str, + product: str, + string_to_sign: str, +) -> str: + signing_key = _get_signing_key(secret, date, region, product) + return hmac.new( + signing_key, string_to_sign.encode("utf-8"), hashlib.sha256 + ).hexdigest() + + +def get_agentrun_signed_headers( + url: str, + method: str = "GET", + access_key_id: Optional[str] = None, + access_key_secret: Optional[str] = None, + security_token: Optional[str] = None, + region: str = "cn-hangzhou", + product: str = "agentrun", + body: Optional[bytes] = None, + content_type: Optional[str] = None, + sign_time: Optional[datetime] = None, +) -> dict: + """ + 生成 AgentRun 签名头(手写实现,无外部依赖)。 + 返回包含 Agentrun-Authorization、x-acs-date、x-acs-content-sha256 等的 headers。 + content_type 若提供会参与签名(与 http-auth-acs 一致,SignedHeaders 含 content-type)。 + """ + if not access_key_id or not access_key_secret: + raise ValueError("Access Key ID and Secret are required") + + parsed = urlparse(url) + host = parsed.netloc + pathname = parsed.path or "/" + query_params: dict = {} + if parsed.query: + for pair in parsed.query.split("&"): + if "=" in pair: + k, v = pair.split("=", 1) + query_params[k] = v + + now = sign_time if sign_time is not None else datetime.now(timezone.utc) + if now.tzinfo is None: + now = now.replace(tzinfo=timezone.utc) + timestamp = now.strftime("%Y-%m-%dT%H:%M:%SZ") + date = now.strftime("%Y%m%d") + + headers_for_sign: dict = { + "host": host, + "x-acs-date": timestamp, + "x-acs-content-sha256": UNSIGNED_PAYLOAD, + } + if security_token: + headers_for_sign["x-acs-security-token"] = security_token + if content_type is not None: + headers_for_sign["content-type"] = content_type + + scope = _build_scope(date, region, product) + canonical_request = _calc_canonical_request( + method, + pathname, + query_params, + headers_for_sign, + UNSIGNED_PAYLOAD, + ) + string_to_sign = _calc_string_to_sign(canonical_request) + signature = _calc_signature( + access_key_secret, date, region, product, string_to_sign + ) + + signed_headers_str = ";".join(_get_signed_headers(headers_for_sign)) + auth_value = ( + f"{ALGORITHM} Credential={access_key_id}/{scope}," + f"SignedHeaders={signed_headers_str},Signature={signature}" + ) + + result = dict(headers_for_sign) + result["Agentrun-Authorization"] = auth_value + return result + + +def get_agentrun_signed_headers_with_debug( + url: str, + method: str = "GET", + access_key_id: Optional[str] = None, + access_key_secret: Optional[str] = None, + security_token: Optional[str] = None, + region: str = "cn-hangzhou", + product: str = "agentrun", + body: Optional[bytes] = None, + content_type: Optional[str] = None, + sign_time: Optional[datetime] = None, +) -> tuple[dict, dict]: + """ + 与 get_agentrun_signed_headers 相同,但额外返回调试信息,便于与 ram-e2e-test 或服务端对比。 + 返回 (headers, debug_dict),debug_dict 含 canonical_request, string_to_sign, signature。 + """ + if not access_key_id or not access_key_secret: + raise ValueError("Access Key ID and Secret are required") + + parsed = urlparse(url) + host = parsed.netloc + pathname = parsed.path or "/" + query_params: dict = {} + if parsed.query: + for pair in parsed.query.split("&"): + if "=" in pair: + k, v = pair.split("=", 1) + query_params[k] = v + + now = sign_time if sign_time is not None else datetime.now(timezone.utc) + if now.tzinfo is None: + now = now.replace(tzinfo=timezone.utc) + timestamp = now.strftime("%Y-%m-%dT%H:%M:%SZ") + date = now.strftime("%Y%m%d") + + headers_for_sign: dict = { + "host": host, + "x-acs-date": timestamp, + "x-acs-content-sha256": UNSIGNED_PAYLOAD, + } + if security_token: + headers_for_sign["x-acs-security-token"] = security_token + if content_type is not None: + headers_for_sign["content-type"] = content_type + + scope = _build_scope(date, region, product) + canonical_request = _calc_canonical_request( + method, + pathname, + query_params, + headers_for_sign, + UNSIGNED_PAYLOAD, + ) + string_to_sign = _calc_string_to_sign(canonical_request) + signature = _calc_signature( + access_key_secret, date, region, product, string_to_sign + ) + signing_key = _get_signing_key(access_key_secret, date, region, product) + + signed_headers_str = ";".join(_get_signed_headers(headers_for_sign)) + auth_value = ( + f"{ALGORITHM} Credential={access_key_id}/{scope}," + f"SignedHeaders={signed_headers_str},Signature={signature}" + ) + + result = dict(headers_for_sign) + result["Agentrun-Authorization"] = auth_value + debug = { + "canonical_request": canonical_request, + "string_to_sign": string_to_sign, + "signing_key_hex": signing_key.hex(), + "signature": signature, + } + return result, debug diff --git a/tests/unittests/ram_signature/__init__.py b/tests/unittests/ram_signature/__init__.py new file mode 100644 index 0000000..fd0a499 --- /dev/null +++ b/tests/unittests/ram_signature/__init__.py @@ -0,0 +1 @@ +# Tests for agentrun.ram_signature standalone implementation diff --git a/tests/unittests/ram_signature/test_signer.py b/tests/unittests/ram_signature/test_signer.py new file mode 100644 index 0000000..bf726df --- /dev/null +++ b/tests/unittests/ram_signature/test_signer.py @@ -0,0 +1,292 @@ +"""测试独立文件夹内的 RAM 签名手写实现(无 mock)。""" + +from datetime import datetime, timezone +from urllib.parse import urlparse + +import pytest + +from agentrun.ram_signature.python.signer import get_agentrun_signed_headers + + +class TestRamSignatureStandalone: + """对手写实现的直接测试,不依赖 data_api 或 mock。""" + + def test_returns_required_headers(self): + headers = get_agentrun_signed_headers( + url="https://uid-ram.agentrun-data.cn-hangzhou.aliyuncs.com/sandboxes/s1/health", + method="GET", + access_key_id="ak", + access_key_secret="sk", + region="cn-hangzhou", + product="agentrun", + ) + assert "Agentrun-Authorization" in headers + assert "x-acs-date" in headers + assert "x-acs-content-sha256" in headers + assert headers["x-acs-content-sha256"] == "UNSIGNED-PAYLOAD" + assert "host" in headers + assert ( + "uid-ram.agentrun-data.cn-hangzhou.aliyuncs.com" in headers["host"] + ) + + def test_authorization_format(self): + headers = get_agentrun_signed_headers( + url="https://example.agentrun-data.cn-hangzhou.aliyuncs.com/path", + method="GET", + access_key_id="test-ak", + access_key_secret="test-sk", + ) + auth = headers["Agentrun-Authorization"] + assert auth.startswith("AGENTRUN4-HMAC-SHA256 ") + assert "Credential=test-ak/" in auth + assert "SignedHeaders=" in auth + assert "Signature=" in auth + + def test_requires_ak_sk(self): + with pytest.raises(ValueError, match="Access Key ID and Secret"): + get_agentrun_signed_headers( + url="https://x.agentrun-data.cn-hangzhou.aliyuncs.com/", + access_key_id="", + access_key_secret="sk", + ) + with pytest.raises(ValueError, match="Access Key ID and Secret"): + get_agentrun_signed_headers( + url="https://x.agentrun-data.cn-hangzhou.aliyuncs.com/", + access_key_id="ak", + access_key_secret="", + ) + + def test_security_token_in_headers_when_provided(self): + headers = get_agentrun_signed_headers( + url="https://x.agentrun-data.cn-hangzhou.aliyuncs.com/", + access_key_id="ak", + access_key_secret="sk", + security_token="sts-token", + ) + assert "x-acs-security-token" in headers + assert headers["x-acs-security-token"] == "sts-token" + assert "x-acs-security-token" in headers["Agentrun-Authorization"] + + def test_deterministic_with_same_inputs(self): + url = "https://uid-ram.agentrun-data.cn-hangzhou.aliyuncs.com/path?a=1" + opts = dict( + url=url, + method="POST", + access_key_id="ak", + access_key_secret="sk", + region="cn-hangzhou", + product="agentrun", + ) + from datetime import datetime, timezone + + t = datetime(2025, 1, 1, 12, 0, 0, tzinfo=timezone.utc) + h1 = get_agentrun_signed_headers(**opts, sign_time=t) + h2 = get_agentrun_signed_headers(**opts, sign_time=t) + assert h1["Agentrun-Authorization"] == h2["Agentrun-Authorization"] + assert h1["x-acs-date"] == h2["x-acs-date"] + + def test_fixed_params_snapshot_matches_compare_script(self): + """与 scripts/compare_ram_signature.py 使用相同固定参数,签名结果应与快照一致(便于与 ram-e2e-test 对比)。""" + from datetime import datetime, timezone + + FIXED_SIGN_TIME = datetime(2026, 3, 6, 12, 0, 0, tzinfo=timezone.utc) + TEST_URL = "https://1431999136518149-ram.agentrun-data.cn-hangzhou.aliyuncs.com/sandboxes/sbx-xxx/health" + DEFAULT_AK = "LTAI5t5opp3xMWk2B3a4gZVq" + DEFAULT_SK = "secret" + + headers = get_agentrun_signed_headers( + url=TEST_URL, + method="GET", + access_key_id=DEFAULT_AK, + access_key_secret=DEFAULT_SK, + region="cn-hangzhou", + product="agentrun", + sign_time=FIXED_SIGN_TIME, + ) + auth = headers.get("Agentrun-Authorization", "") + sig = ( + auth.split("Signature=")[-1].strip() if "Signature=" in auth else "" + ) + # 快照:与 compare_ram_signature.py 同参数时的输出;若 ram-e2e-test 有 REF_Signature 可与此对比 + EXPECTED_SIGNATURE = ( + "e1479ea1aec37e55f91d82b1ccc48df2feef04184a911f91a3e3fe0e27d02610" + ) + assert sig == EXPECTED_SIGNATURE, ( + f"固定参数下 Signature 与快照不一致: got {sig!r}, expected" + f" {EXPECTED_SIGNATURE!r}. 可与 ram-e2e-test/print_ref_signature.py" + " 输出对比。" + ) + + +# 固定时间 + query/body/header 多场景快照(与 scripts/print_ram_signature_snapshots.py / 官方包 / JS SDK 一致) +# JS/Python 等 SDK 可用相同输入校验签名一致,参见 TestRamSignatureFixedQueryBodyHeader。 +FIXED_SIGN_TIME_QBH = datetime(2026, 3, 6, 12, 0, 0, tzinfo=timezone.utc) +FIXED_AK = "LTAI5t5opp3xMWk2B3a4gZVq" +FIXED_SK = "secret" +BASE_URL_QBH = ( + "https://1431999136518149-ram.agentrun-data.cn-hangzhou.aliyuncs.com" +) +SCENARIO_1_EXPECTED = ( + "2de34f2c0ef3d6f0f6460f994ee3fea8c940241fcc2ff2f008a776f1be9b4dba" +) +SCENARIO_2_EXPECTED = ( + "a0d4e04405ddf83d93cf8b30c6064c1a68a298cf95ca6ff56be04f515b98ddbe" +) +SCENARIO_3_EXPECTED = ( + "fdcde808b0dc7526d8083e681ca9a64728e5c1e67e4c92735dfb2268ecc71fb2" +) +# 场景说明(供 JS SDK 对齐): +# 1) GET {BASE_URL_QBH}/path?foo=bar&zoo= , no body, no content-type -> SCENARIO_1_EXPECTED +# 2) POST {BASE_URL_QBH}/path?foo=bar&zoo= , body=b"", no content-type -> SCENARIO_2_EXPECTED +# 3) POST {BASE_URL_QBH}/path?foo=bar&zoo= , body=b"", content-type=application/json -> SCENARIO_3_EXPECTED +# 时间统一: 2026-03-06T12:00:00Z (date=20260306), region=cn-hangzhou, product=agentrun, UNSIGNED-PAYLOAD + + +def _ref_signature_if_available( + url: str, + method: str, + body: bytes | None, + content_type: str | None, + sign_time: datetime, + ak: str, + sk: str, + region: str, + product: str, +) -> str | None: + """若已安装 alibabacloud_signer_inner,用相同参数生成参考签名,便于与官方包/JS 对齐。""" + try: + from alibabacloud_signer_inner import AcsV4HttpSigner, SignRequest + except ImportError: + return None + + class UnsignedPayloadSigner(AcsV4HttpSigner): + ALGORITHM = "AGENTRUN4-HMAC-SHA256" + + def _hash_payload(self, payload) -> str: + return "UNSIGNED-PAYLOAD" + + parsed = urlparse(url) + host = parsed.netloc + pathname = parsed.path or "/" + query_params = {} + if parsed.query: + for pair in parsed.query.split("&"): + if "=" in pair: + k, v = pair.split("=", 1) + query_params[k] = v + + timestamp = sign_time.strftime("%Y-%m-%dT%H:%M:%SZ") + date = sign_time.strftime("%Y%m%d") + headers = { + "host": host, + "x-acs-date": timestamp, + "x-acs-content-sha256": "UNSIGNED-PAYLOAD", + } + if content_type is not None: + headers["content-type"] = content_type + + payload = body if body is not None else b"" + sign_request = SignRequest( + pathname=pathname, + method=method.upper(), + query_parameters=query_params, + header_parameters=headers, + payload=payload, + access_key_id=ak, + access_key_secret=sk, + security_token=None, + product=product, + region=region, + date=date, + ) + signer = UnsignedPayloadSigner() + auth_value = signer.sign(sign_request) + return ( + auth_value.split("Signature=")[-1].strip() + if "Signature=" in auth_value + else None + ) + + +class TestRamSignatureFixedQueryBodyHeader: + """固定时间 + query/body/header 多场景:与官方包(alibabacloud_signer_inner)及 JS SDK 结果一致。""" + + def _sig( + self, + url: str, + method: str, + body: bytes | None, + content_type: str | None, + ) -> str: + headers = get_agentrun_signed_headers( + url=url, + method=method, + access_key_id=FIXED_AK, + access_key_secret=FIXED_SK, + region="cn-hangzhou", + product="agentrun", + body=body, + content_type=content_type, + sign_time=FIXED_SIGN_TIME_QBH, + ) + auth = headers.get("Agentrun-Authorization", "") + return ( + auth.split("Signature=")[-1].strip() if "Signature=" in auth else "" + ) + + def test_scenario_1_get_query_no_body_no_content_type(self): + """GET + query (foo=bar&zoo=),无 body,无 content-type。""" + url = f"{BASE_URL_QBH}/path?foo=bar&zoo=" + sig = self._sig(url, "GET", None, None) + assert sig == SCENARIO_1_EXPECTED, f"got {sig!r}" + ref = _ref_signature_if_available( + url, + "GET", + None, + None, + FIXED_SIGN_TIME_QBH, + FIXED_AK, + FIXED_SK, + "cn-hangzhou", + "agentrun", + ) + if ref is not None: + assert sig == ref, "SDK 与官方包(ref) 签名应一致" + + def test_scenario_2_post_query_empty_body_no_content_type(self): + """POST + query,body 空,无 content-type。""" + url = f"{BASE_URL_QBH}/path?foo=bar&zoo=" + sig = self._sig(url, "POST", b"", None) + assert sig == SCENARIO_2_EXPECTED, f"got {sig!r}" + ref = _ref_signature_if_available( + url, + "POST", + b"", + None, + FIXED_SIGN_TIME_QBH, + FIXED_AK, + FIXED_SK, + "cn-hangzhou", + "agentrun", + ) + if ref is not None: + assert sig == ref, "SDK 与官方包(ref) 签名应一致" + + def test_scenario_3_post_query_empty_body_content_type_json(self): + """POST + query,body 空,content-type: application/json。""" + url = f"{BASE_URL_QBH}/path?foo=bar&zoo=" + sig = self._sig(url, "POST", b"", "application/json") + assert sig == SCENARIO_3_EXPECTED, f"got {sig!r}" + ref = _ref_signature_if_available( + url, + "POST", + b"", + "application/json", + FIXED_SIGN_TIME_QBH, + FIXED_AK, + FIXED_SK, + "cn-hangzhou", + "agentrun", + ) + if ref is not None: + assert sig == ref, "SDK 与官方包(ref) 签名应一致" diff --git a/tests/unittests/utils/test_data_api.py b/tests/unittests/utils/test_data_api.py index 7285bf2..9534147 100644 --- a/tests/unittests/utils/test_data_api.py +++ b/tests/unittests/utils/test_data_api.py @@ -47,21 +47,14 @@ def test_init_basic(self): assert api.resource_name == "test-resource" assert api.resource_type == ResourceType.Runtime assert api.namespace == "agents" - assert api.access_token is None - - def test_init_with_token_in_config(self): - """测试使用 config 中的 token 初始化""" - config = Config(token="my-token", account_id="test-account") - api = DataAPI( - resource_name="test-resource", - resource_type=ResourceType.Runtime, - config=config, - ) - assert api.access_token == "my-token" def test_init_with_custom_namespace(self): """测试自定义 namespace""" - config = Config(account_id="test-account") + config = Config( + account_id="test-account", + access_key_id="", + access_key_secret="", + ) api = DataAPI( resource_name="test-resource", resource_type=ResourceType.Runtime, @@ -165,21 +158,34 @@ def test_path_with_list_query_value(self): class TestDataAPIAuth: - """测试 DataAPI.auth""" + """测试 DataAPI.auth(仅 RAM 签名鉴权)""" - def test_auth_with_existing_token(self): - """测试已有 token 的认证""" - config = Config(token="my-token", account_id="test-account") + def test_auth_without_ak_sk_returns_no_auth_header(self): + """无 AK/SK 时 auth 不添加鉴权头""" + config = Config( + account_id="test-account", + access_key_id="", + access_key_secret="", + ) api = DataAPI( resource_name="test", resource_type=ResourceType.Runtime, config=config, ) url, headers, query = api.auth("https://example.com", {}, None) - assert headers["Agentrun-Access-Token"] == "my-token" - - def test_auth_fetches_token_on_demand(self): - """测试按需获取 token""" + assert "Agentrun-Access-Token" not in headers + assert "Agentrun-Authorization" not in headers + + @patch("agentrun.utils.data_api.get_agentrun_signed_headers") + def test_auth_uses_ram_signature_when_ak_sk_provided( + self, mock_signed_headers + ): + """测试配置了 AK/SK 且无 token 时使用 RAM 签名鉴权""" + mock_signed_headers.return_value = { + "Agentrun-Authorization": "mock-sig", + "x-acs-date": "2025-01-01T00:00:00Z", + "x-acs-content-sha256": "UNSIGNED-PAYLOAD", + } config = Config( access_key_id="ak", access_key_secret="sk", @@ -191,19 +197,22 @@ def test_auth_fetches_token_on_demand(self): config=config, ) - # Mock the token fetch - ControlAPI is imported inside the auth method - with patch("agentrun.utils.control_api.ControlAPI") as mock_control: - mock_client = MagicMock() - mock_response = MagicMock() - mock_response.body.data.access_token = "fetched-token" - mock_client.get_access_token.return_value = mock_response - mock_control.return_value._get_client.return_value = mock_client - - url, headers, query = api.auth("https://example.com", {}, None) - assert api.access_token == "fetched-token" - - def test_auth_handles_fetch_error(self): - """测试获取 token 失败的处理""" + url = "https://test-account-ram.agentrun-data.cn-hangzhou.aliyuncs.com/agents/resources" + url, headers, query = api.auth(url, {}, None, method="GET", body=None) + assert "Agentrun-Authorization" in headers + assert headers["Agentrun-Authorization"] == "mock-sig" + assert "x-acs-date" in headers + assert "x-acs-content-sha256" in headers + assert headers.get("x-acs-content-sha256") == "UNSIGNED-PAYLOAD" + + @patch("agentrun.utils.data_api.get_agentrun_signed_headers") + def test_auth_with_ak_sk_returns_signed_headers(self, mock_signed_headers): + """测试有 AK/SK 时 auth 返回签名头且不抛异常""" + mock_signed_headers.return_value = { + "Agentrun-Authorization": "mock-sig", + "x-acs-date": "2025-01-01T00:00:00Z", + "x-acs-content-sha256": "UNSIGNED-PAYLOAD", + } config = Config( access_key_id="ak", access_key_secret="sk", @@ -215,15 +224,14 @@ def test_auth_handles_fetch_error(self): config=config, ) - # Mock the token fetch to fail - ControlAPI is imported inside the auth method - with patch("agentrun.utils.control_api.ControlAPI") as mock_control: - mock_control.return_value._get_client.side_effect = Exception( - "Failed" - ) - - # 不应该抛出异常 - url, headers, query = api.auth("https://example.com", {}, None) - assert api.access_token is None + url, headers, query = api.auth( + "https://test-account-ram.agentrun-data.cn-hangzhou.aliyuncs.com/path", + {}, + None, + method="GET", + ) + assert "Agentrun-Authorization" in headers + assert headers["Agentrun-Authorization"] == "mock-sig" class TestDataAPIPrepareRequest: @@ -231,7 +239,11 @@ class TestDataAPIPrepareRequest: def test_prepare_request_with_dict_data(self): """测试使用字典数据准备请求""" - config = Config(token="token", account_id="test-account") + config = Config( + account_id="test-account", + access_key_id="", + access_key_secret="", + ) api = DataAPI( resource_name="test", resource_type=ResourceType.Runtime, @@ -246,7 +258,11 @@ def test_prepare_request_with_dict_data(self): def test_prepare_request_with_string_data(self): """测试使用字符串数据准备请求""" - config = Config(token="token", account_id="test-account") + config = Config( + account_id="test-account", + access_key_id="", + access_key_secret="", + ) api = DataAPI( resource_name="test", resource_type=ResourceType.Runtime, @@ -260,7 +276,11 @@ def test_prepare_request_with_string_data(self): def test_prepare_request_with_query(self): """测试带查询参数的请求准备""" - config = Config(token="token", account_id="test-account") + config = Config( + account_id="test-account", + access_key_id="", + access_key_secret="", + ) api = DataAPI( resource_name="test", resource_type=ResourceType.Runtime, @@ -273,7 +293,11 @@ def test_prepare_request_with_query(self): def test_prepare_request_with_list_query(self): """测试带多值列表查询参数的请求准备""" - config = Config(token="token", account_id="test-account") + config = Config( + account_id="test-account", + access_key_id="", + access_key_secret="", + ) api = DataAPI( resource_name="test", resource_type=ResourceType.Runtime, @@ -289,7 +313,11 @@ def test_prepare_request_with_list_query(self): def test_prepare_request_with_non_standard_data(self): """测试使用非 dict/str 类型数据准备请求""" - config = Config(token="token", account_id="test-account") + config = Config( + account_id="test-account", + access_key_id="", + access_key_secret="", + ) api = DataAPI( resource_name="test", resource_type=ResourceType.Runtime, @@ -309,7 +337,11 @@ class TestDataAPIHTTPMethods: @respx.mock def test_get(self): """测试 GET 请求""" - config = Config(token="token", account_id="test-account") + config = Config( + account_id="test-account", + access_key_id="", + access_key_secret="", + ) api = DataAPI( resource_name="test", resource_type=ResourceType.Runtime, @@ -326,7 +358,11 @@ def test_get(self): @respx.mock def test_post(self): """测试 POST 请求""" - config = Config(token="token", account_id="test-account") + config = Config( + account_id="test-account", + access_key_id="", + access_key_secret="", + ) api = DataAPI( resource_name="test", resource_type=ResourceType.Runtime, @@ -343,7 +379,11 @@ def test_post(self): @respx.mock def test_put(self): """测试 PUT 请求""" - config = Config(token="token", account_id="test-account") + config = Config( + account_id="test-account", + access_key_id="", + access_key_secret="", + ) api = DataAPI( resource_name="test", resource_type=ResourceType.Runtime, @@ -360,7 +400,11 @@ def test_put(self): @respx.mock def test_patch(self): """测试 PATCH 请求""" - config = Config(token="token", account_id="test-account") + config = Config( + account_id="test-account", + access_key_id="", + access_key_secret="", + ) api = DataAPI( resource_name="test", resource_type=ResourceType.Runtime, @@ -377,7 +421,11 @@ def test_patch(self): @respx.mock def test_delete(self): """测试 DELETE 请求""" - config = Config(token="token", account_id="test-account") + config = Config( + account_id="test-account", + access_key_id="", + access_key_secret="", + ) api = DataAPI( resource_name="test", resource_type=ResourceType.Runtime, @@ -394,7 +442,11 @@ def test_delete(self): @respx.mock def test_empty_response(self): """测试空响应""" - config = Config(token="token", account_id="test-account") + config = Config( + account_id="test-account", + access_key_id="", + access_key_secret="", + ) api = DataAPI( resource_name="test", resource_type=ResourceType.Runtime, @@ -411,7 +463,11 @@ def test_empty_response(self): @respx.mock def test_bad_gateway_error(self): """测试 502 Bad Gateway 错误""" - config = Config(token="token", account_id="test-account") + config = Config( + account_id="test-account", + access_key_id="", + access_key_secret="", + ) api = DataAPI( resource_name="test", resource_type=ResourceType.Runtime, @@ -433,7 +489,11 @@ def test_bad_gateway_error(self): @respx.mock def test_json_parse_error(self): """测试 JSON 解析错误""" - config = Config(token="token", account_id="test-account") + config = Config( + account_id="test-account", + access_key_id="", + access_key_secret="", + ) api = DataAPI( resource_name="test", resource_type=ResourceType.Runtime, @@ -451,7 +511,11 @@ def test_json_parse_error(self): @respx.mock def test_request_error(self): """测试请求错误""" - config = Config(token="token", account_id="test-account") + config = Config( + account_id="test-account", + access_key_id="", + access_key_secret="", + ) api = DataAPI( resource_name="test", resource_type=ResourceType.Runtime, @@ -474,7 +538,11 @@ class TestDataAPIAsyncMethods: @pytest.mark.asyncio async def test_get_async(self): """测试异步 GET 请求""" - config = Config(token="token", account_id="test-account") + config = Config( + account_id="test-account", + access_key_id="", + access_key_secret="", + ) api = DataAPI( resource_name="test", resource_type=ResourceType.Runtime, @@ -492,7 +560,11 @@ async def test_get_async(self): @pytest.mark.asyncio async def test_post_async(self): """测试异步 POST 请求""" - config = Config(token="token", account_id="test-account") + config = Config( + account_id="test-account", + access_key_id="", + access_key_secret="", + ) api = DataAPI( resource_name="test", resource_type=ResourceType.Runtime, @@ -510,7 +582,11 @@ async def test_post_async(self): @pytest.mark.asyncio async def test_put_async(self): """测试异步 PUT 请求""" - config = Config(token="token", account_id="test-account") + config = Config( + account_id="test-account", + access_key_id="", + access_key_secret="", + ) api = DataAPI( resource_name="test", resource_type=ResourceType.Runtime, @@ -528,7 +604,11 @@ async def test_put_async(self): @pytest.mark.asyncio async def test_patch_async(self): """测试异步 PATCH 请求""" - config = Config(token="token", account_id="test-account") + config = Config( + account_id="test-account", + access_key_id="", + access_key_secret="", + ) api = DataAPI( resource_name="test", resource_type=ResourceType.Runtime, @@ -546,7 +626,11 @@ async def test_patch_async(self): @pytest.mark.asyncio async def test_delete_async(self): """测试异步 DELETE 请求""" - config = Config(token="token", account_id="test-account") + config = Config( + account_id="test-account", + access_key_id="", + access_key_secret="", + ) api = DataAPI( resource_name="test", resource_type=ResourceType.Runtime, @@ -564,7 +648,11 @@ async def test_delete_async(self): @pytest.mark.asyncio async def test_async_empty_response(self): """测试异步空响应""" - config = Config(token="token", account_id="test-account") + config = Config( + account_id="test-account", + access_key_id="", + access_key_secret="", + ) api = DataAPI( resource_name="test", resource_type=ResourceType.Runtime, @@ -582,7 +670,11 @@ async def test_async_empty_response(self): @pytest.mark.asyncio async def test_async_request_error(self): """测试异步请求错误""" - config = Config(token="token", account_id="test-account") + config = Config( + account_id="test-account", + access_key_id="", + access_key_secret="", + ) api = DataAPI( resource_name="test", resource_type=ResourceType.Runtime, @@ -604,7 +696,11 @@ class TestDataAPIFileOperations: @respx.mock def test_post_file(self): """测试同步上传文件""" - config = Config(token="token", account_id="test-account") + config = Config( + account_id="test-account", + access_key_id="", + access_key_secret="", + ) api = DataAPI( resource_name="test", resource_type=ResourceType.Runtime, @@ -629,7 +725,11 @@ def test_post_file(self): @pytest.mark.asyncio async def test_post_file_async(self): """测试异步上传文件""" - config = Config(token="token", account_id="test-account") + config = Config( + account_id="test-account", + access_key_id="", + access_key_secret="", + ) api = DataAPI( resource_name="test", resource_type=ResourceType.Runtime, @@ -655,7 +755,11 @@ async def test_post_file_async(self): @respx.mock def test_get_file(self): """测试同步下载文件""" - config = Config(token="token", account_id="test-account") + config = Config( + account_id="test-account", + access_key_id="", + access_key_secret="", + ) api = DataAPI( resource_name="test", resource_type=ResourceType.Runtime, @@ -685,7 +789,11 @@ def test_get_file(self): @pytest.mark.asyncio async def test_get_file_async(self): """测试异步下载文件""" - config = Config(token="token", account_id="test-account") + config = Config( + account_id="test-account", + access_key_id="", + access_key_secret="", + ) api = DataAPI( resource_name="test", resource_type=ResourceType.Runtime, @@ -711,7 +819,11 @@ async def test_get_file_async(self): @respx.mock def test_get_video(self): """测试同步下载视频""" - config = Config(token="token", account_id="test-account") + config = Config( + account_id="test-account", + access_key_id="", + access_key_secret="", + ) api = DataAPI( resource_name="test", resource_type=ResourceType.Runtime, @@ -736,7 +848,11 @@ def test_get_video(self): @pytest.mark.asyncio async def test_get_video_async(self): """测试异步下载视频""" - config = Config(token="token", account_id="test-account") + config = Config( + account_id="test-account", + access_key_id="", + access_key_secret="", + ) api = DataAPI( resource_name="test", resource_type=ResourceType.Runtime, @@ -760,7 +876,11 @@ async def test_get_video_async(self): @respx.mock def test_post_file_http_error(self): """测试上传文件时的 HTTP 错误""" - config = Config(token="token", account_id="test-account") + config = Config( + account_id="test-account", + access_key_id="", + access_key_secret="", + ) api = DataAPI( resource_name="test", resource_type=ResourceType.Runtime, @@ -784,7 +904,11 @@ def test_post_file_http_error(self): @respx.mock def test_get_file_http_error(self): """测试下载文件时的 HTTP 错误""" - config = Config(token="token", account_id="test-account") + config = Config( + account_id="test-account", + access_key_id="", + access_key_secret="", + ) api = DataAPI( resource_name="test", resource_type=ResourceType.Runtime, @@ -812,7 +936,11 @@ class TestDataAPIHTTPStatusError: @respx.mock def test_http_status_error_with_response_text(self): """测试 HTTPStatusError 带响应文本""" - config = Config(token="token", account_id="test-account") + config = Config( + account_id="test-account", + access_key_id="", + access_key_secret="", + ) api = DataAPI( resource_name="test", resource_type=ResourceType.Runtime, @@ -842,7 +970,11 @@ def test_http_status_error_with_response_text(self): @pytest.mark.asyncio async def test_async_http_status_error(self): """测试异步 HTTPStatusError""" - config = Config(token="token", account_id="test-account") + config = Config( + account_id="test-account", + access_key_id="", + access_key_secret="", + ) api = DataAPI( resource_name="test", resource_type=ResourceType.Runtime, @@ -871,7 +1003,11 @@ async def test_async_http_status_error(self): @pytest.mark.asyncio async def test_async_bad_gateway_error(self): """测试异步 502 Bad Gateway 错误""" - config = Config(token="token", account_id="test-account") + config = Config( + account_id="test-account", + access_key_id="", + access_key_secret="", + ) api = DataAPI( resource_name="test", resource_type=ResourceType.Runtime, @@ -894,7 +1030,11 @@ async def test_async_bad_gateway_error(self): @pytest.mark.asyncio async def test_async_json_parse_error(self): """测试异步 JSON 解析错误""" - config = Config(token="token", account_id="test-account") + config = Config( + account_id="test-account", + access_key_id="", + access_key_secret="", + ) api = DataAPI( resource_name="test", resource_type=ResourceType.Runtime, @@ -917,7 +1057,11 @@ class TestDataAPIFileOperationsErrors: @pytest.mark.asyncio async def test_post_file_async_http_error(self): """测试异步上传文件时的 HTTP 错误""" - config = Config(token="token", account_id="test-account") + config = Config( + account_id="test-account", + access_key_id="", + access_key_secret="", + ) api = DataAPI( resource_name="test", resource_type=ResourceType.Runtime, @@ -944,7 +1088,11 @@ async def test_post_file_async_http_error(self): @pytest.mark.asyncio async def test_get_file_async_http_error(self): """测试异步下载文件时的 HTTP 错误""" - config = Config(token="token", account_id="test-account") + config = Config( + account_id="test-account", + access_key_id="", + access_key_secret="", + ) api = DataAPI( resource_name="test", resource_type=ResourceType.Runtime, @@ -968,7 +1116,11 @@ async def test_get_file_async_http_error(self): @respx.mock def test_get_video_http_error(self): """测试同步下载视频时的 HTTP 错误""" - config = Config(token="token", account_id="test-account") + config = Config( + account_id="test-account", + access_key_id="", + access_key_secret="", + ) api = DataAPI( resource_name="test", resource_type=ResourceType.Runtime, @@ -993,7 +1145,11 @@ def test_get_video_http_error(self): @pytest.mark.asyncio async def test_get_video_async_http_error(self): """测试异步下载视频时的 HTTP 错误""" - config = Config(token="token", account_id="test-account") + config = Config( + account_id="test-account", + access_key_id="", + access_key_secret="", + ) api = DataAPI( resource_name="test", resource_type=ResourceType.Runtime, @@ -1016,10 +1172,18 @@ async def test_get_video_async_http_error(self): class TestDataAPIAuthWithSandbox: - """测试 DataAPI 针对 Sandbox 资源类型的认证""" - - def test_auth_with_sandbox_resource_type(self): - """测试 Sandbox 资源类型使用 resource_id""" + """测试 DataAPI 针对 Sandbox 资源类型的认证(RAM 鉴权下与其它资源类型一致)""" + + @patch("agentrun.utils.data_api.get_agentrun_signed_headers") + def test_auth_with_sandbox_uses_ram_when_ak_sk_provided( + self, mock_signed_headers + ): + """测试 Sandbox 资源类型在配置 AK/SK 时同样使用 RAM 签名""" + mock_signed_headers.return_value = { + "Agentrun-Authorization": "mock-sig", + "x-acs-date": "2025-01-01T00:00:00Z", + "x-acs-content-sha256": "UNSIGNED-PAYLOAD", + } config = Config( access_key_id="ak", access_key_secret="sk", @@ -1031,17 +1195,14 @@ def test_auth_with_sandbox_resource_type(self): config=config, ) - # Mock the token fetch - ControlAPI is imported inside the auth method - with patch("agentrun.utils.control_api.ControlAPI") as mock_control: - mock_client = MagicMock() - mock_response = MagicMock() - mock_response.body.data.access_token = "sandbox-token" - mock_client.get_access_token.return_value = mock_response - mock_control.return_value._get_client.return_value = mock_client - - url, headers, query = api.auth("https://example.com", {}, None) - - # 验证调用使用了 resource_id 而不是 resource_name - call_args = mock_client.get_access_token.call_args - request_obj = call_args[0][0] - assert hasattr(request_obj, "resource_id") + url, headers, query = api.auth( + "https://test-account-ram.agentrun-data.cn-hangzhou.aliyuncs.com/sandboxes/sandbox-123/health", + {}, + None, + method="GET", + ) + assert "Agentrun-Authorization" in headers + assert ( + api.get_base_url().startswith("https://") + and "-ram." in api.get_base_url() + ) From 7ac57e5f7f199b62f29a5c06b899264e19d21055 Mon Sep 17 00:00:00 2001 From: OhYee Date: Mon, 9 Mar 2026 16:30:57 +0800 Subject: [PATCH 2/4] refactor(toolset): add explicit return statements and improve code formatting MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add explicit return None statements in ToolSet methods and improve code formatting for better readability and consistency across the codebase. 在 ToolSet 方法中添加显式的返回语句并改进代码格式 在 ToolSet 方法中添加显式的 return None 语句,并改进代码格式以提高代码库的一致性和可读性。 Change-Id: Icd499f1ccb977bc443f690d81b0694535dc1b99a Signed-off-by: OhYee --- agentrun/toolset/__toolset_async_template.py | 2 ++ agentrun/toolset/toolset.py | 2 ++ 2 files changed, 4 insertions(+) diff --git a/agentrun/toolset/__toolset_async_template.py b/agentrun/toolset/__toolset_async_template.py index 87cc9e0..a6f333f 100644 --- a/agentrun/toolset/__toolset_async_template.py +++ b/agentrun/toolset/__toolset_async_template.py @@ -107,6 +107,8 @@ def _get_openapi_base_url(self) -> Optional[str]: None, ) + return None + async def get_async(self, config: Optional[Config] = None): if self.name is None: raise ValueError("ToolSet name is required to get the ToolSet.") diff --git a/agentrun/toolset/toolset.py b/agentrun/toolset/toolset.py index 965969b..76fa75e 100644 --- a/agentrun/toolset/toolset.py +++ b/agentrun/toolset/toolset.py @@ -122,6 +122,8 @@ def _get_openapi_base_url(self) -> Optional[str]: None, ) + return None + async def get_async(self, config: Optional[Config] = None): if self.name is None: raise ValueError("ToolSet name is required to get the ToolSet.") From f1ae3a4e501c91f49ccac6a7c11e25bb924c6dab Mon Sep 17 00:00:00 2001 From: OhYee Date: Tue, 10 Mar 2026 10:18:14 +0800 Subject: [PATCH 3/4] refactor(auth): simplify URL resolution logic and enhance test coverage MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The _get_openapi_base_url method was refactored to remove complex FC region logic and simplify URL selection. The implementation now prioritizes intranet URLs when available, falling back to internet URLs. This change improves code readability and maintainability while maintaining the same functional behavior. Additionally, comprehensive test suites were added for RAM signature helper functions, ControlAPI client methods, and exception handling to ensure robust authentication and error management. 测试套件已增加以验证 RAM 签名辅助函数、ControlAPI 客户端方法和异常处理, 确保身份验证和错误管理的健壮性。 Change-Id: Ia9d8ff6d2bfd37ec858413f13686fcee8fd6d912 Signed-off-by: OhYee --- agentrun/toolset/__toolset_async_template.py | 19 +-- agentrun/toolset/toolset.py | 19 +-- tests/unittests/ram_signature/test_signer.py | 164 ++++++++++++++++++- tests/unittests/utils/test_control_api.py | 136 +++++++++++++++ tests/unittests/utils/test_exception.py | 27 +++ 5 files changed, 338 insertions(+), 27 deletions(-) diff --git a/agentrun/toolset/__toolset_async_template.py b/agentrun/toolset/__toolset_async_template.py index a6f333f..d77cd02 100644 --- a/agentrun/toolset/__toolset_async_template.py +++ b/agentrun/toolset/__toolset_async_template.py @@ -94,20 +94,13 @@ def _get_openapi_auth_defaults( return headers, query def _get_openapi_base_url(self) -> Optional[str]: - import os - - fc_region = os.getenv("FC_REGION") - arn = pydash.get(self, "status.outputs.function_arn", "") - - if fc_region and arn and pydash.get(arn.split(":"), "[2]"): - # 在同一个 region,则使用内网地址 - return pydash.get( - self, - "status.outputs.urls.intranet_url", - None, - ) + intranet_url: Optional[str] = pydash.get( + self, "status.outputs.urls.intranet_url", None + ) + if intranet_url: + return intranet_url - return None + return pydash.get(self, "status.outputs.urls.internet_url", None) async def get_async(self, config: Optional[Config] = None): if self.name is None: diff --git a/agentrun/toolset/toolset.py b/agentrun/toolset/toolset.py index 76fa75e..67e220a 100644 --- a/agentrun/toolset/toolset.py +++ b/agentrun/toolset/toolset.py @@ -109,20 +109,13 @@ def _get_openapi_auth_defaults( return headers, query def _get_openapi_base_url(self) -> Optional[str]: - import os - - fc_region = os.getenv("FC_REGION") - arn = pydash.get(self, "status.outputs.function_arn", "") - - if fc_region and arn and pydash.get(arn.split(":"), "[2]"): - # 在同一个 region,则使用内网地址 - return pydash.get( - self, - "status.outputs.urls.intranet_url", - None, - ) + intranet_url: Optional[str] = pydash.get( + self, "status.outputs.urls.intranet_url", None + ) + if intranet_url: + return intranet_url - return None + return pydash.get(self, "status.outputs.urls.internet_url", None) async def get_async(self, config: Optional[Config] = None): if self.name is None: diff --git a/tests/unittests/ram_signature/test_signer.py b/tests/unittests/ram_signature/test_signer.py index bf726df..48a7b4e 100644 --- a/tests/unittests/ram_signature/test_signer.py +++ b/tests/unittests/ram_signature/test_signer.py @@ -5,7 +5,13 @@ import pytest -from agentrun.ram_signature.python.signer import get_agentrun_signed_headers +from agentrun.utils.ram_signature.signer import ( + _canonical_headers, + _canonical_uri, + _percent_encode, + get_agentrun_signed_headers, + get_agentrun_signed_headers_with_debug, +) class TestRamSignatureStandalone: @@ -290,3 +296,159 @@ def test_scenario_3_post_query_empty_body_content_type_json(self): ) if ref is not None: assert sig == ref, "SDK 与官方包(ref) 签名应一致" + + +class TestSignerHelperFunctions: + """测试签名辅助函数的边界情况""" + + def test_percent_encode_none(self): + """测试 _percent_encode(None) 返回空字符串""" + assert _percent_encode(None) == "" + + def test_percent_encode_tilde(self): + """测试 _percent_encode 正确处理 ~ 字符""" + assert "~" in _percent_encode("a~b") + + def test_canonical_uri_empty(self): + """测试 _canonical_uri 空字符串返回 /""" + assert _canonical_uri("") == "/" + + def test_canonical_uri_none(self): + """测试 _canonical_uri None 返回 /""" + assert _canonical_uri(None) == "/" + + def test_canonical_uri_normal(self): + """测试 _canonical_uri 正常路径""" + assert _canonical_uri("/path/to/resource") == "/path/to/resource" + + def test_canonical_headers_skips_none_values(self): + """测试 _canonical_headers 跳过 value 为 None 的 header""" + headers = { + "host": "example.com", + "x-acs-date": "2026-01-01T00:00:00Z", + "x-acs-skip": None, + } + canon, signed = _canonical_headers(headers) + assert "x-acs-skip" not in signed + assert "host" in signed + + +class TestSignerNaiveDatetime: + """测试 naive datetime(无时区信息)的处理""" + + def test_naive_datetime_gets_utc(self): + """测试 naive datetime 被自动设置为 UTC""" + naive_time = datetime(2026, 1, 1, 12, 0, 0) + headers = get_agentrun_signed_headers( + url="https://x.agentrun-data.cn-hangzhou.aliyuncs.com/path", + access_key_id="ak", + access_key_secret="sk", + sign_time=naive_time, + ) + assert headers["x-acs-date"] == "2026-01-01T12:00:00Z" + + +class TestSignerWithDebug: + """测试 get_agentrun_signed_headers_with_debug 函数""" + + def test_returns_headers_and_debug(self): + """测试返回 headers 和 debug 信息""" + t = datetime(2026, 1, 1, 12, 0, 0, tzinfo=timezone.utc) + headers, debug = get_agentrun_signed_headers_with_debug( + url="https://x.agentrun-data.cn-hangzhou.aliyuncs.com/path", + access_key_id="ak", + access_key_secret="sk", + sign_time=t, + ) + assert "Agentrun-Authorization" in headers + assert "x-acs-date" in headers + assert "canonical_request" in debug + assert "string_to_sign" in debug + assert "signing_key_hex" in debug + assert "signature" in debug + + def test_debug_signature_matches_headers(self): + """测试 debug 中的 signature 与 headers 中的一致""" + t = datetime(2026, 1, 1, 12, 0, 0, tzinfo=timezone.utc) + headers, debug = get_agentrun_signed_headers_with_debug( + url="https://x.agentrun-data.cn-hangzhou.aliyuncs.com/path", + access_key_id="ak", + access_key_secret="sk", + sign_time=t, + ) + auth = headers["Agentrun-Authorization"] + sig_in_auth = auth.split("Signature=")[-1] + assert sig_in_auth == debug["signature"] + + def test_debug_matches_non_debug_version(self): + """测试 debug 版本与非 debug 版本签名一致""" + t = datetime(2026, 1, 1, 12, 0, 0, tzinfo=timezone.utc) + opts = dict( + url="https://x.agentrun-data.cn-hangzhou.aliyuncs.com/path?a=1", + method="POST", + access_key_id="ak", + access_key_secret="sk", + sign_time=t, + ) + headers_normal = get_agentrun_signed_headers(**opts) + headers_debug, _ = get_agentrun_signed_headers_with_debug(**opts) + assert ( + headers_normal["Agentrun-Authorization"] + == headers_debug["Agentrun-Authorization"] + ) + + def test_debug_requires_ak_sk(self): + """测试 debug 版本也要求 ak/sk""" + with pytest.raises(ValueError, match="Access Key ID and Secret"): + get_agentrun_signed_headers_with_debug( + url="https://x.agentrun-data.cn-hangzhou.aliyuncs.com/", + access_key_id="", + access_key_secret="sk", + ) + + def test_debug_with_security_token(self): + """测试 debug 版本带 security_token""" + t = datetime(2026, 1, 1, 12, 0, 0, tzinfo=timezone.utc) + headers, debug = get_agentrun_signed_headers_with_debug( + url="https://x.agentrun-data.cn-hangzhou.aliyuncs.com/path", + access_key_id="ak", + access_key_secret="sk", + security_token="sts-token", + sign_time=t, + ) + assert "x-acs-security-token" in headers + assert "x-acs-security-token" in headers["Agentrun-Authorization"] + + def test_debug_with_content_type(self): + """测试 debug 版本带 content_type""" + t = datetime(2026, 1, 1, 12, 0, 0, tzinfo=timezone.utc) + headers, debug = get_agentrun_signed_headers_with_debug( + url="https://x.agentrun-data.cn-hangzhou.aliyuncs.com/path", + access_key_id="ak", + access_key_secret="sk", + content_type="application/json", + sign_time=t, + ) + assert "content-type" in headers["Agentrun-Authorization"] + + def test_debug_with_query_params(self): + """测试 debug 版本带 query 参数""" + t = datetime(2026, 1, 1, 12, 0, 0, tzinfo=timezone.utc) + headers, debug = get_agentrun_signed_headers_with_debug( + url="https://x.agentrun-data.cn-hangzhou.aliyuncs.com/path?foo=bar&zoo=", + access_key_id="ak", + access_key_secret="sk", + sign_time=t, + ) + assert "foo=bar" in debug["canonical_request"] + + def test_debug_naive_datetime(self): + """测试 debug 版本处理 naive datetime""" + naive_time = datetime(2026, 1, 1, 12, 0, 0) + headers, debug = get_agentrun_signed_headers_with_debug( + url="https://x.agentrun-data.cn-hangzhou.aliyuncs.com/path", + access_key_id="ak", + access_key_secret="sk", + sign_time=naive_time, + ) + assert headers["x-acs-date"] == "2026-01-01T12:00:00Z" diff --git a/tests/unittests/utils/test_control_api.py b/tests/unittests/utils/test_control_api.py index 0113d38..efd799f 100644 --- a/tests/unittests/utils/test_control_api.py +++ b/tests/unittests/utils/test_control_api.py @@ -277,3 +277,139 @@ def test_get_devs_client_with_read_timeout(self, mock_client_class): config_arg = call_args[0][0] assert config_arg.connect_timeout == 300 assert config_arg.read_timeout == 60000 + + +class TestControlAPIGetBailianClient: + """测试 ControlAPI._get_bailian_client""" + + @patch("agentrun.utils.control_api.BailianClient") + def test_get_bailian_client_basic(self, mock_client_class): + """测试获取基本百炼客户端""" + config = Config( + access_key_id="ak", + access_key_secret="sk", + region_id="cn-hangzhou", + ) + api = ControlAPI(config=config) + + mock_client = MagicMock() + mock_client_class.return_value = mock_client + + result = api._get_bailian_client() + + assert mock_client_class.called + call_args = mock_client_class.call_args + config_arg = call_args[0][0] + assert config_arg.access_key_id == "ak" + assert config_arg.access_key_secret == "sk" + assert config_arg.region_id == "cn-hangzhou" + + @patch("agentrun.utils.control_api.BailianClient") + def test_get_bailian_client_strips_https_prefix(self, mock_client_class): + """测试获取百炼客户端时去除 https:// 前缀""" + config = Config( + access_key_id="ak", + access_key_secret="sk", + bailian_endpoint="https://bailian.cn-hangzhou.aliyuncs.com", + ) + api = ControlAPI(config=config) + + mock_client = MagicMock() + mock_client_class.return_value = mock_client + + api._get_bailian_client() + + call_args = mock_client_class.call_args + config_arg = call_args[0][0] + assert config_arg.endpoint == "bailian.cn-hangzhou.aliyuncs.com" + + @patch("agentrun.utils.control_api.BailianClient") + def test_get_bailian_client_strips_http_prefix(self, mock_client_class): + """测试获取百炼客户端时去除 http:// 前缀""" + config = Config( + access_key_id="ak", + access_key_secret="sk", + bailian_endpoint="http://bailian.custom.com", + ) + api = ControlAPI(config=config) + + mock_client = MagicMock() + mock_client_class.return_value = mock_client + + api._get_bailian_client() + + call_args = mock_client_class.call_args + config_arg = call_args[0][0] + assert config_arg.endpoint == "bailian.custom.com" + + +class TestControlAPIGetGPDBClient: + """测试 ControlAPI._get_gpdb_client""" + + @patch("agentrun.utils.control_api.GPDBClient") + def test_get_gpdb_client_known_region(self, mock_client_class): + """测试已知 region 使用通用 endpoint""" + config = Config( + access_key_id="ak", + access_key_secret="sk", + region_id="cn-hangzhou", + ) + api = ControlAPI(config=config) + + mock_client = MagicMock() + mock_client_class.return_value = mock_client + + api._get_gpdb_client() + + call_args = mock_client_class.call_args + config_arg = call_args[0][0] + assert config_arg.endpoint == "gpdb.aliyuncs.com" + + @patch("agentrun.utils.control_api.GPDBClient") + def test_get_gpdb_client_unknown_region(self, mock_client_class): + """测试未知 region 使用区域级别 endpoint""" + config = Config( + access_key_id="ak", + access_key_secret="sk", + region_id="us-west-1", + ) + api = ControlAPI(config=config) + + mock_client = MagicMock() + mock_client_class.return_value = mock_client + + api._get_gpdb_client() + + call_args = mock_client_class.call_args + config_arg = call_args[0][0] + assert config_arg.endpoint == "gpdb.us-west-1.aliyuncs.com" + + @patch("agentrun.utils.control_api.GPDBClient") + def test_get_gpdb_client_all_known_regions(self, mock_client_class): + """测试所有已知 region 使用通用 endpoint""" + known_regions = [ + "cn-beijing", + "cn-hangzhou", + "cn-shanghai", + "cn-shenzhen", + "cn-hongkong", + "ap-southeast-1", + ] + for region in known_regions: + config = Config( + access_key_id="ak", + access_key_secret="sk", + region_id=region, + ) + api = ControlAPI(config=config) + + mock_client = MagicMock() + mock_client_class.return_value = mock_client + + api._get_gpdb_client() + + call_args = mock_client_class.call_args + config_arg = call_args[0][0] + assert ( + config_arg.endpoint == "gpdb.aliyuncs.com" + ), f"Region {region} should use gpdb.aliyuncs.com" diff --git a/tests/unittests/utils/test_exception.py b/tests/unittests/utils/test_exception.py index dc509e8..a08c36c 100644 --- a/tests/unittests/utils/test_exception.py +++ b/tests/unittests/utils/test_exception.py @@ -4,6 +4,7 @@ from agentrun.utils.exception import ( AgentRunError, + BrowserToolError, ClientError, DeleteResourceError, HTTPError, @@ -216,3 +217,29 @@ def test_init_with_message(self): result = str(error) assert "Failed to delete resource" in result assert "Resource is locked" in result + + +class TestBrowserToolError: + """测试 BrowserToolError 异常类""" + + def test_init_without_operation(self): + """测试不带 operation 的初始化""" + error = BrowserToolError("Element not found") + assert str(error) == "Element not found" + assert error.operation is None + + def test_init_with_operation(self): + """测试带 operation 的初始化""" + error = BrowserToolError("Element not found", operation="click") + assert "click" in str(error) + assert "Element not found" in str(error) + assert error.operation == "click" + + +class TestAgentRunErrorEdgeCases: + """测试 AgentRunError 边界情况""" + + def test_init_with_empty_message(self): + """测试空消息的初始化""" + error = AgentRunError("") + assert error.message == "" From eda73b934301db060d3b7079f52bde3fe41aa827 Mon Sep 17 00:00:00 2001 From: OhYee Date: Tue, 10 Mar 2026 20:12:41 +0800 Subject: [PATCH 4/4] feat(auth): implement RAM signature authentication for agentrun-data endpoints MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit adds comprehensive RAM signature authentication support for both OpenAPI and MCP tool invocations. introduces automatic URL rewriting to -ram endpoints when targeting agentrun-data domains, dynamic signature generation per request, and proper header injection. enhances error handling with detailed logging for HTTP status failures and removes deprecated debug functions. The implementation includes: - httpx Auth handler for dynamic RAM signature calculation per request - automatic URL rewriting from agentrun-data to -ram endpoints - enhanced MCP session with RAM authentication support - improved OpenAPI tool invocation with RAM signature injection - better error logging for failed requests - conditional intranet URL usage based on FC_REGION environment variable - unified MCP URL resolution with fallback mechanisms BREAKING CHANGE: replaces assertion-based error handling with ValueError exceptions in MCP URL resolution feat(auth): 为 agentrun-data 端点实现 RAM 签名认证 为 OpenAPI 和 MCP 工具调用添加了全面的 RAM 签名认证支持。引入了自动 URL 重写为 -ram 端点、动态签名生成和适当的头信息注入。增强了错误处理并提供详细的 HTTP 状态失败日志,移除了已弃用的调试函数。 实现包括: - 用于每个请求动态计算 RAM 签名的 httpx Auth 处理器 - 自动将 agentrun-data 重写为 -ram 端点的 URL - 支持 RAM 认证的增强 MCP 会话 - 具有 RAM 签名注入的改进 OpenAPI 工具调用 - 针对失败请求的改进错误日志 - 基于 FC_REGION 环境变量的条件内网 URL 使用 - 具有回退机制的统一 MCP URL 解析 重大变更:将 MCP URL 解析中的断言错误处理替换为 ValueError 异常 BREAKING CHANGE: replaces assertion-based error handling with ValueError exceptions in MCP URL resolution 重大变更: 在 MCP URL 解析中将基于断言的错误处理替换为 ValueError 异常 Change-Id: I415cb816f44f6eca78b7165b96b1fce8fd7d3404 Co-authored-by: Copilot Signed-off-by: OhYee --- agentrun/toolset/__toolset_async_template.py | 45 +++++-- agentrun/toolset/api/mcp.py | 116 +++++++++++++++- agentrun/toolset/api/openapi.py | 88 ++++++++++++- agentrun/toolset/toolset.py | 45 +++++-- agentrun/utils/ram_signature/signer.py | 76 ----------- tests/unittests/ram_signature/test_signer.py | 107 --------------- tests/unittests/toolset/api/test_openapi.py | 132 +++++++++++++++++++ tests/unittests/toolset/test_toolset.py | 33 ++++- 8 files changed, 425 insertions(+), 217 deletions(-) diff --git a/agentrun/toolset/__toolset_async_template.py b/agentrun/toolset/__toolset_async_template.py index d77cd02..7a92754 100644 --- a/agentrun/toolset/__toolset_async_template.py +++ b/agentrun/toolset/__toolset_async_template.py @@ -94,11 +94,15 @@ def _get_openapi_auth_defaults( return headers, query def _get_openapi_base_url(self) -> Optional[str]: - intranet_url: Optional[str] = pydash.get( - self, "status.outputs.urls.intranet_url", None - ) - if intranet_url: - return intranet_url + import os + + fc_region = os.getenv("FC_REGION") + if fc_region: + intranet_url: Optional[str] = pydash.get( + self, "status.outputs.urls.intranet_url", None + ) + if intranet_url: + return intranet_url return pydash.get(self, "status.outputs.urls.internet_url", None) @@ -156,6 +160,25 @@ async def call_tool_async( logger.debug("invoke tool %s got result %s", name, result) return result + def _get_mcp_url(self) -> str: + """获取 MCP 工具的最佳 URL / Get the best URL for MCP tool + + 优先使用 agentrun-data 代理入口(支持 RAM 签名认证), + 回退到 mcp_server_config.url(直连)。 + Priority: agentrun-data proxy endpoint (with RAM auth) > mcp_server_config.url (direct). + """ + proxy_url = self._get_openapi_base_url() + if proxy_url: + return proxy_url + + mcp_server_config: MCPServerConfig = pydash.get( + self, "status.outputs.mcp_server_config", None + ) + if mcp_server_config and mcp_server_config.url: + return mcp_server_config.url + + raise ValueError("MCP server URL is missing.") + def to_apiset(self, config: Optional[Config] = None): """将 ToolSet 转换为统一的 ApiSet 对象 @@ -170,16 +193,16 @@ def to_apiset(self, config: Optional[Config] = None): mcp_server_config: MCPServerConfig = pydash.get( self, "status.outputs.mcp_server_config", None ) - assert ( - mcp_server_config.url is not None - ), "MCP server URL is missing." - cfg = Config.with_configs( - config, Config(headers=mcp_server_config.headers) + mcp_url = self._get_mcp_url() + + mcp_headers = ( + mcp_server_config.headers if mcp_server_config else None ) + cfg = Config.with_configs(config, Config(headers=mcp_headers)) mcp_client = MCPToolSet( - url=mcp_server_config.url, + url=mcp_url, config=cfg, ) diff --git a/agentrun/toolset/api/mcp.py b/agentrun/toolset/api/mcp.py index d2c382a..ab0d3a1 100644 --- a/agentrun/toolset/api/mcp.py +++ b/agentrun/toolset/api/mcp.py @@ -4,10 +4,88 @@ Handles tool invocations for MCP (Model Context Protocol). """ -from typing import Any, Dict, Optional +from typing import Any, Dict, Generator, Optional +from urllib.parse import urlparse, urlunparse + +import httpx from agentrun.utils.config import Config from agentrun.utils.log import logger +from agentrun.utils.ram_signature import get_agentrun_signed_headers + + +class _AgentrunRamAuth(httpx.Auth): + """httpx Auth handler:为每次请求动态生成 RAM 签名。 + + SSE 场景下同一个 httpx.AsyncClient 会发出 GET(SSE 连接)和 + POST(消息发送)请求,URL / method / body 各不相同,因此必须 + per-request 计算签名,不能在 client 初始化时一次性设置 headers。 + """ + + def __init__( + self, + access_key_id: str, + access_key_secret: str, + region: str, + security_token: Optional[str] = None, + ): + self._ak = access_key_id + self._sk = access_key_secret + self._region = region + self._security_token = security_token + + def auth_flow( + self, request: httpx.Request + ) -> Generator[httpx.Request, httpx.Response, None]: + url = str(request.url) + method = request.method + + body: Optional[bytes] = None + if request.content: + body = request.content + + content_type: Optional[str] = request.headers.get("content-type") + + try: + signed = get_agentrun_signed_headers( + url=url, + method=method, + access_key_id=self._ak, + access_key_secret=self._sk, + security_token=self._security_token, + region=self._region, + product="agentrun", + body=body, + content_type=content_type, + ) + for k, v in signed.items(): + request.headers[k] = v + logger.debug( + "applied RAM signature for MCP %s request to %s", + method, + url[:80] + ("..." if len(url) > 80 else ""), + ) + except ValueError as e: + logger.warning("RAM signing skipped for MCP request: %s", e) + + yield request + + +def _rewrite_to_ram_url(url: str) -> str: + """将 agentrun-data 域名改写为 -ram 端点。""" + parsed = urlparse(url) + parts = parsed.netloc.split(".", 1) + if len(parts) == 2: + ram_netloc = parts[0] + "-ram." + parts[1] + return urlunparse(( + parsed.scheme, + ram_netloc, + parsed.path or "", + parsed.params, + parsed.query, + parsed.fragment, + )) + return url class MCPSession: @@ -16,14 +94,46 @@ def __init__(self, url: str, config: Optional[Config] = None): self.url = url self.config = Config.with_configs(config) + def _build_ram_auth(self, url: str) -> tuple: + """当目标是 agentrun-data 域名时,改写 URL 并返回 httpx Auth handler。 + + Returns: + (rewritten_url, auth_or_none) + """ + parsed = urlparse(url) + if ".agentrun-data." not in (parsed.netloc or ""): + return url, None + + cfg = self.config + ak = cfg.get_access_key_id() + sk = cfg.get_access_key_secret() + if not ak or not sk: + return url, None + + url = _rewrite_to_ram_url(url) + + auth = _AgentrunRamAuth( + access_key_id=ak, + access_key_secret=sk, + region=cfg.get_region_id(), + security_token=cfg.get_security_token() or None, + ) + return url, auth + async def __aenter__(self): from mcp import ClientSession from mcp.client.sse import sse_client timeout = self.config.get_timeout() + headers = self.config.get_headers() + url = self.url + + url, auth = self._build_ram_auth(url) + self.client = sse_client( - url=self.url, - headers=self.config.get_headers(), + url=url, + headers=headers, + auth=auth, timeout=timeout if timeout else 60, ) read, write = await self.client.__aenter__() diff --git a/agentrun/toolset/api/openapi.py b/agentrun/toolset/api/openapi.py index 45e5d83..5d52b39 100644 --- a/agentrun/toolset/api/openapi.py +++ b/agentrun/toolset/api/openapi.py @@ -7,6 +7,7 @@ from copy import deepcopy import json from typing import Any, Dict, List, Optional, Tuple, Union +from urllib.parse import urlparse, urlunparse import httpx from pydash import get as pg @@ -15,6 +16,7 @@ from agentrun.utils.config import Config from agentrun.utils.log import logger from agentrun.utils.model import BaseModel +from agentrun.utils.ram_signature import get_agentrun_signed_headers from ..model import ToolInfo, ToolSchema @@ -830,11 +832,21 @@ def invoke_tool( name, arguments, config ) - print(request_kwargs) with httpx.Client(timeout=timeout) as client: response = client.request(**request_kwargs) if raise_for_status: - response.raise_for_status() + try: + response.raise_for_status() + except httpx.HTTPStatusError: + logger.error( + "OpenAPI tool request failed: status=%s url=%s " + "response_headers=%s response_body=%s", + response.status_code, + response.request.url, + dict(response.headers), + response.text[:2000], + ) + raise return self._format_response(response) async def invoke_tool_async( @@ -850,7 +862,18 @@ async def invoke_tool_async( async with httpx.AsyncClient(timeout=timeout) as client: response = await client.request(**request_kwargs) if raise_for_status: - response.raise_for_status() + try: + response.raise_for_status() + except httpx.HTTPStatusError: + logger.error( + "OpenAPI tool request failed: status=%s url=%s " + "response_headers=%s response_body=%s", + response.status_code, + response.request.url, + dict(response.headers), + response.text[:2000], + ) + raise return self._format_response(response) def _load_schema(self, schema: Any) -> Dict[str, Any]: @@ -945,7 +968,6 @@ def _walk(node: Any): def _extract_base_url(self, schema: Dict[str, Any]) -> Optional[str]: servers = schema.get("servers") or [] - print("======", servers) return self._pick_server_url(servers) def _convert_to_native(self, value: Any) -> Any: @@ -1177,8 +1199,66 @@ def _prepare_request( args, ) + self._apply_ram_auth(request_kwargs, combined_config) + return request_kwargs, timeout, raise_for_status + def _apply_ram_auth( + self, + request_kwargs: Dict[str, Any], + config: Optional[Config], + ) -> None: + """当目标是 agentrun-data 域名时,自动注入 RAM 签名鉴权 headers 并改写为 -ram 端点。""" + url = request_kwargs.get("url", "") + parsed = urlparse(url) + if ".agentrun-data." not in (parsed.netloc or ""): + return + + cfg = Config.with_configs(config) + ak = cfg.get_access_key_id() + sk = cfg.get_access_key_secret() + if not ak or not sk: + return + + parts = parsed.netloc.split(".", 1) + if len(parts) == 2: + ram_netloc = parts[0] + "-ram." + parts[1] + url = urlunparse(( + parsed.scheme, + ram_netloc, + parsed.path or "", + parsed.params, + parsed.query, + parsed.fragment, + )) + request_kwargs["url"] = url + + method = request_kwargs.get("method", "GET") + body_bytes: Optional[bytes] = None + json_body = request_kwargs.get("json") + if json_body is not None: + body_bytes = json.dumps(json_body).encode("utf-8") + + try: + signed = get_agentrun_signed_headers( + url=url, + method=method, + access_key_id=ak, + access_key_secret=sk, + security_token=cfg.get_security_token() or None, + region=cfg.get_region_id(), + product="agentrun", + body=body_bytes, + ) + existing_headers: Dict[str, str] = request_kwargs.get("headers", {}) + request_kwargs["headers"] = {**signed, **existing_headers} + logger.debug( + "applied RAM signature for OpenAPI tool request to %s", + url[:80] + "..." if len(url) > 80 else url, + ) + except ValueError as e: + logger.warning("RAM signing skipped for OpenAPI tool: %s", e) + def _format_response(self, response: httpx.Response) -> Dict[str, Any]: try: body = response.json() diff --git a/agentrun/toolset/toolset.py b/agentrun/toolset/toolset.py index 67e220a..5aa2496 100644 --- a/agentrun/toolset/toolset.py +++ b/agentrun/toolset/toolset.py @@ -109,11 +109,15 @@ def _get_openapi_auth_defaults( return headers, query def _get_openapi_base_url(self) -> Optional[str]: - intranet_url: Optional[str] = pydash.get( - self, "status.outputs.urls.intranet_url", None - ) - if intranet_url: - return intranet_url + import os + + fc_region = os.getenv("FC_REGION") + if fc_region: + intranet_url: Optional[str] = pydash.get( + self, "status.outputs.urls.intranet_url", None + ) + if intranet_url: + return intranet_url return pydash.get(self, "status.outputs.urls.internet_url", None) @@ -223,6 +227,25 @@ def call_tool( logger.debug("invoke tool %s got result %s", name, result) return result + def _get_mcp_url(self) -> str: + """获取 MCP 工具的最佳 URL / Get the best URL for MCP tool + + 优先使用 agentrun-data 代理入口(支持 RAM 签名认证), + 回退到 mcp_server_config.url(直连)。 + Priority: agentrun-data proxy endpoint (with RAM auth) > mcp_server_config.url (direct). + """ + proxy_url = self._get_openapi_base_url() + if proxy_url: + return proxy_url + + mcp_server_config: MCPServerConfig = pydash.get( + self, "status.outputs.mcp_server_config", None + ) + if mcp_server_config and mcp_server_config.url: + return mcp_server_config.url + + raise ValueError("MCP server URL is missing.") + def to_apiset(self, config: Optional[Config] = None): """将 ToolSet 转换为统一的 ApiSet 对象 @@ -237,16 +260,16 @@ def to_apiset(self, config: Optional[Config] = None): mcp_server_config: MCPServerConfig = pydash.get( self, "status.outputs.mcp_server_config", None ) - assert ( - mcp_server_config.url is not None - ), "MCP server URL is missing." - cfg = Config.with_configs( - config, Config(headers=mcp_server_config.headers) + mcp_url = self._get_mcp_url() + + mcp_headers = ( + mcp_server_config.headers if mcp_server_config else None ) + cfg = Config.with_configs(config, Config(headers=mcp_headers)) mcp_client = MCPToolSet( - url=mcp_server_config.url, + url=mcp_url, config=cfg, ) diff --git a/agentrun/utils/ram_signature/signer.py b/agentrun/utils/ram_signature/signer.py index 301716f..ce0b126 100644 --- a/agentrun/utils/ram_signature/signer.py +++ b/agentrun/utils/ram_signature/signer.py @@ -196,79 +196,3 @@ def get_agentrun_signed_headers( result = dict(headers_for_sign) result["Agentrun-Authorization"] = auth_value return result - - -def get_agentrun_signed_headers_with_debug( - url: str, - method: str = "GET", - access_key_id: Optional[str] = None, - access_key_secret: Optional[str] = None, - security_token: Optional[str] = None, - region: str = "cn-hangzhou", - product: str = "agentrun", - body: Optional[bytes] = None, - content_type: Optional[str] = None, - sign_time: Optional[datetime] = None, -) -> tuple[dict, dict]: - """ - 与 get_agentrun_signed_headers 相同,但额外返回调试信息,便于与 ram-e2e-test 或服务端对比。 - 返回 (headers, debug_dict),debug_dict 含 canonical_request, string_to_sign, signature。 - """ - if not access_key_id or not access_key_secret: - raise ValueError("Access Key ID and Secret are required") - - parsed = urlparse(url) - host = parsed.netloc - pathname = parsed.path or "/" - query_params: dict = {} - if parsed.query: - for pair in parsed.query.split("&"): - if "=" in pair: - k, v = pair.split("=", 1) - query_params[k] = v - - now = sign_time if sign_time is not None else datetime.now(timezone.utc) - if now.tzinfo is None: - now = now.replace(tzinfo=timezone.utc) - timestamp = now.strftime("%Y-%m-%dT%H:%M:%SZ") - date = now.strftime("%Y%m%d") - - headers_for_sign: dict = { - "host": host, - "x-acs-date": timestamp, - "x-acs-content-sha256": UNSIGNED_PAYLOAD, - } - if security_token: - headers_for_sign["x-acs-security-token"] = security_token - if content_type is not None: - headers_for_sign["content-type"] = content_type - - scope = _build_scope(date, region, product) - canonical_request = _calc_canonical_request( - method, - pathname, - query_params, - headers_for_sign, - UNSIGNED_PAYLOAD, - ) - string_to_sign = _calc_string_to_sign(canonical_request) - signature = _calc_signature( - access_key_secret, date, region, product, string_to_sign - ) - signing_key = _get_signing_key(access_key_secret, date, region, product) - - signed_headers_str = ";".join(_get_signed_headers(headers_for_sign)) - auth_value = ( - f"{ALGORITHM} Credential={access_key_id}/{scope}," - f"SignedHeaders={signed_headers_str},Signature={signature}" - ) - - result = dict(headers_for_sign) - result["Agentrun-Authorization"] = auth_value - debug = { - "canonical_request": canonical_request, - "string_to_sign": string_to_sign, - "signing_key_hex": signing_key.hex(), - "signature": signature, - } - return result, debug diff --git a/tests/unittests/ram_signature/test_signer.py b/tests/unittests/ram_signature/test_signer.py index 48a7b4e..cb4a913 100644 --- a/tests/unittests/ram_signature/test_signer.py +++ b/tests/unittests/ram_signature/test_signer.py @@ -10,7 +10,6 @@ _canonical_uri, _percent_encode, get_agentrun_signed_headers, - get_agentrun_signed_headers_with_debug, ) @@ -346,109 +345,3 @@ def test_naive_datetime_gets_utc(self): sign_time=naive_time, ) assert headers["x-acs-date"] == "2026-01-01T12:00:00Z" - - -class TestSignerWithDebug: - """测试 get_agentrun_signed_headers_with_debug 函数""" - - def test_returns_headers_and_debug(self): - """测试返回 headers 和 debug 信息""" - t = datetime(2026, 1, 1, 12, 0, 0, tzinfo=timezone.utc) - headers, debug = get_agentrun_signed_headers_with_debug( - url="https://x.agentrun-data.cn-hangzhou.aliyuncs.com/path", - access_key_id="ak", - access_key_secret="sk", - sign_time=t, - ) - assert "Agentrun-Authorization" in headers - assert "x-acs-date" in headers - assert "canonical_request" in debug - assert "string_to_sign" in debug - assert "signing_key_hex" in debug - assert "signature" in debug - - def test_debug_signature_matches_headers(self): - """测试 debug 中的 signature 与 headers 中的一致""" - t = datetime(2026, 1, 1, 12, 0, 0, tzinfo=timezone.utc) - headers, debug = get_agentrun_signed_headers_with_debug( - url="https://x.agentrun-data.cn-hangzhou.aliyuncs.com/path", - access_key_id="ak", - access_key_secret="sk", - sign_time=t, - ) - auth = headers["Agentrun-Authorization"] - sig_in_auth = auth.split("Signature=")[-1] - assert sig_in_auth == debug["signature"] - - def test_debug_matches_non_debug_version(self): - """测试 debug 版本与非 debug 版本签名一致""" - t = datetime(2026, 1, 1, 12, 0, 0, tzinfo=timezone.utc) - opts = dict( - url="https://x.agentrun-data.cn-hangzhou.aliyuncs.com/path?a=1", - method="POST", - access_key_id="ak", - access_key_secret="sk", - sign_time=t, - ) - headers_normal = get_agentrun_signed_headers(**opts) - headers_debug, _ = get_agentrun_signed_headers_with_debug(**opts) - assert ( - headers_normal["Agentrun-Authorization"] - == headers_debug["Agentrun-Authorization"] - ) - - def test_debug_requires_ak_sk(self): - """测试 debug 版本也要求 ak/sk""" - with pytest.raises(ValueError, match="Access Key ID and Secret"): - get_agentrun_signed_headers_with_debug( - url="https://x.agentrun-data.cn-hangzhou.aliyuncs.com/", - access_key_id="", - access_key_secret="sk", - ) - - def test_debug_with_security_token(self): - """测试 debug 版本带 security_token""" - t = datetime(2026, 1, 1, 12, 0, 0, tzinfo=timezone.utc) - headers, debug = get_agentrun_signed_headers_with_debug( - url="https://x.agentrun-data.cn-hangzhou.aliyuncs.com/path", - access_key_id="ak", - access_key_secret="sk", - security_token="sts-token", - sign_time=t, - ) - assert "x-acs-security-token" in headers - assert "x-acs-security-token" in headers["Agentrun-Authorization"] - - def test_debug_with_content_type(self): - """测试 debug 版本带 content_type""" - t = datetime(2026, 1, 1, 12, 0, 0, tzinfo=timezone.utc) - headers, debug = get_agentrun_signed_headers_with_debug( - url="https://x.agentrun-data.cn-hangzhou.aliyuncs.com/path", - access_key_id="ak", - access_key_secret="sk", - content_type="application/json", - sign_time=t, - ) - assert "content-type" in headers["Agentrun-Authorization"] - - def test_debug_with_query_params(self): - """测试 debug 版本带 query 参数""" - t = datetime(2026, 1, 1, 12, 0, 0, tzinfo=timezone.utc) - headers, debug = get_agentrun_signed_headers_with_debug( - url="https://x.agentrun-data.cn-hangzhou.aliyuncs.com/path?foo=bar&zoo=", - access_key_id="ak", - access_key_secret="sk", - sign_time=t, - ) - assert "foo=bar" in debug["canonical_request"] - - def test_debug_naive_datetime(self): - """测试 debug 版本处理 naive datetime""" - naive_time = datetime(2026, 1, 1, 12, 0, 0) - headers, debug = get_agentrun_signed_headers_with_debug( - url="https://x.agentrun-data.cn-hangzhou.aliyuncs.com/path", - access_key_id="ak", - access_key_secret="sk", - sign_time=naive_time, - ) - assert headers["x-acs-date"] == "2026-01-01T12:00:00Z" diff --git a/tests/unittests/toolset/api/test_openapi.py b/tests/unittests/toolset/api/test_openapi.py index 3a60866..bb32eac 100644 --- a/tests/unittests/toolset/api/test_openapi.py +++ b/tests/unittests/toolset/api/test_openapi.py @@ -3,15 +3,18 @@ 测试内容: 1. $ref 解析是否正确展开内部引用 2. Mock 服务端验证请求参数是否符合预期 +3. agentrun-data 域名自动 RAM 签名注入 """ import json +from unittest.mock import patch import httpx import respx from agentrun.toolset.api.openapi import ApiSet, OpenAPI from agentrun.toolset.model import ToolInfo, ToolSchema +from agentrun.utils.config import Config class TestOpenAPIRefResolution: @@ -1210,3 +1213,132 @@ def test_tool_schema(self): assert json_schema["type"] == "object" assert "items" in json_schema["properties"] assert json_schema["properties"]["items"]["type"] == "array" + + +AGENTRUN_DATA_SCHEMA = { + "openapi": "3.0.1", + "info": {"title": "Test", "version": "1.0"}, + "servers": [{ + "url": "https://1431999136518149.agentrun-data.cn-hangzhou.aliyuncs.com/tools/test/" + }], + "paths": { + "/invoke": { + "post": { + "operationId": "test_tool", + "summary": "Test tool", + "requestBody": { + "required": True, + "content": { + "application/json": { + "schema": { + "type": "object", + "properties": { + "query": { + "type": "string", + } + }, + } + } + }, + }, + "responses": {"200": {"description": "OK"}}, + } + } + }, +} + + +class TestOpenAPIRamAuth: + """测试 agentrun-data 域名 OpenAPI 调用时自动注入 RAM 签名""" + + def _make_openapi( + self, + base_url: str | None = None, + config: Config | None = None, + ) -> OpenAPI: + return OpenAPI( + schema=AGENTRUN_DATA_SCHEMA, + base_url=base_url, + config=config, + ) + + def test_ram_auth_applied_for_agentrun_data_url(self): + """agentrun-data 域名应自动注入 RAM 签名 headers 并改写为 -ram 端点""" + cfg = Config( + access_key_id="test-ak", + access_key_secret="test-sk", + region_id="cn-hangzhou", + ) + openapi = self._make_openapi(config=cfg) + request_kwargs, _, _ = openapi._prepare_request( + "test_tool", {"query": "hello"}, cfg + ) + assert "Agentrun-Authorization" in request_kwargs["headers"] + assert "-ram." in request_kwargs["url"] + assert request_kwargs["headers"]["Agentrun-Authorization"].startswith( + "AGENTRUN4-HMAC-SHA256 " + ) + + def test_ram_auth_not_applied_for_non_agentrun_url(self): + """非 agentrun-data 域名不应注入 RAM 签名""" + cfg = Config( + access_key_id="test-ak", + access_key_secret="test-sk", + region_id="cn-hangzhou", + ) + openapi = self._make_openapi( + base_url="https://example.com/api", + config=cfg, + ) + request_kwargs, _, _ = openapi._prepare_request( + "test_tool", {"query": "hello"}, cfg + ) + assert "Agentrun-Authorization" not in request_kwargs["headers"] + assert "-ram." not in request_kwargs["url"] + + def test_ram_auth_skipped_without_ak_sk(self): + """没有 AK/SK 时应跳过 RAM 签名""" + with patch.dict( + "os.environ", + {}, + clear=True, + ): + cfg = Config(region_id="cn-hangzhou") + openapi = self._make_openapi(config=cfg) + request_kwargs, _, _ = openapi._prepare_request( + "test_tool", {"query": "hello"}, cfg + ) + assert "Agentrun-Authorization" not in request_kwargs["headers"] + + def test_ram_auth_url_rewrite(self): + """验证 URL 被正确改写为 -ram 端点""" + cfg = Config( + access_key_id="test-ak", + access_key_secret="test-sk", + region_id="cn-hangzhou", + ) + openapi = self._make_openapi(config=cfg) + request_kwargs, _, _ = openapi._prepare_request( + "test_tool", {"query": "hello"}, cfg + ) + assert request_kwargs["url"].startswith( + "https://1431999136518149-ram.agentrun-data." + ) + + def test_ram_auth_preserves_existing_headers(self): + """RAM 签名不应覆盖用户自定义 headers""" + cfg = Config( + access_key_id="test-ak", + access_key_secret="test-sk", + region_id="cn-hangzhou", + ) + openapi = OpenAPI( + schema=AGENTRUN_DATA_SCHEMA, + headers={"X-Custom": "value"}, + config=cfg, + ) + request_kwargs, _, _ = openapi._prepare_request( + "test_tool", {"query": "hello"}, cfg + ) + assert "Agentrun-Authorization" in request_kwargs["headers"] + assert request_kwargs["headers"]["X-Custom"] == "value" diff --git a/tests/unittests/toolset/test_toolset.py b/tests/unittests/toolset/test_toolset.py index 6d8c6f5..76d2abe 100644 --- a/tests/unittests/toolset/test_toolset.py +++ b/tests/unittests/toolset/test_toolset.py @@ -4,6 +4,8 @@ Tests ToolSet resource class functionality. """ +import os +import unittest.mock from unittest.mock import AsyncMock, MagicMock, patch import pytest @@ -275,8 +277,8 @@ def test_no_urls(self): toolset = ToolSet() assert toolset._get_openapi_base_url() is None - def test_intranet_url_preferred(self): - """测试优先使用内网 URL""" + def test_intranet_url_preferred_on_fc(self): + """测试在 FC 环境下优先使用内网 URL""" toolset = ToolSet( status=ToolSetStatus( outputs=ToolSetStatusOutputs( @@ -287,10 +289,31 @@ def test_intranet_url_preferred(self): ) ) ) - assert toolset._get_openapi_base_url() == "https://internal.example.com" + with unittest.mock.patch.dict(os.environ, {"FC_REGION": "cn-hangzhou"}): + assert ( + toolset._get_openapi_base_url() + == "https://internal.example.com" + ) + + def test_internet_url_when_not_on_fc(self): + """测试非 FC 环境使用公网 URL""" + toolset = ToolSet( + status=ToolSetStatus( + outputs=ToolSetStatusOutputs( + urls=ToolSetStatusOutputsUrls( + internet_url="https://public.example.com", + intranet_url="https://internal.example.com", + ) + ) + ) + ) + with unittest.mock.patch.dict(os.environ, {}, clear=True): + assert ( + toolset._get_openapi_base_url() == "https://public.example.com" + ) def test_internet_url_fallback(self): - """测试公网 URL 作为回退""" + """测试只有公网 URL 时作为回退""" toolset = ToolSet( status=ToolSetStatus( outputs=ToolSetStatusOutputs( @@ -682,5 +705,5 @@ def test_to_apiset_mcp_missing_url(self): ) ), ) - with pytest.raises(AssertionError, match="MCP server URL is missing"): + with pytest.raises(ValueError, match="MCP server URL is missing"): toolset.to_apiset()