What happened / 发生了什么

authropic调用方式解析代码会报“工具调用参数 JSON 解析失败”
Reproduce / 如何复现?
编写代码场景
AstrBot version, deployment method (e.g., Windows Docker Desktop deployment), provider used, and messaging platform used. / AstrBot 版本、部署方式(如 Windows Docker Desktop 部署)、使用的提供商、使用的消息平台适配器
astrbot 4.22.0,linux
OS
Windows
Logs / 报错日志
[2026-03-24 02:16:19.212] [Core] [WARN] [v4.22.0] [sources.anthropic_source:446]: 工具调用参数 JSON 解析失败: {'id': 'call_function_9z1fl0grtups_1', 'name': 'astrbot_execute_shell', 'input': {}, 'input_json': '{"command": "cat > /root/astrbot_plugin_agent_wechat/init.py << 'PYTHON'\n\"\"\"\nAstrBot Plugin: agent-wechat\n通过agent-wechat接入个人微信\n\"\"\"\nfrom .agent_wechat_adapter import AgentWeChatAdapter\nfrom .agent_wechat_client import AgentWeChatClient\n\n__all__ = [\"AgentWeChatAdapter\", \"AgentWeChatClient\"]\nPYTHON\n\ncat > /root/astrbot_plugin_agent_wechat/agent_wechat_client.py << 'PYTHON'\n\"\"\"\nagent-wechat REST API 客户端\n参考: https://github.com/thisnick/agent-wechat/tree/main/packages/shared\\n\\"\\"\\"\\nimport asyncio\nimport base64\nimport json\nfrom typing import Optional, Callable, Any\nfrom dataclasses import dataclass\nimport httpx\nfrom astrbot import logger\n\n\n@dataclass\nclass Chat:\n id: str\n username: str\n name: str\n unreadCount: int\n lastMsgLocalId: Optional[int] = None\n\n\n@dataclass\nclass Message:\n localId: int\n type: int\n content: str\n sender: Optional[str] = None\n senderName: Optional[str] = None\n isSelf: bool = False\n isMentioned: bool = False\n timestamp: int = 0\n reply: Optional[dict] = None\n\n\n@dataclass\nclass AuthStatus:\n status: str # \"logged_in\", \"logged_out\", \"app_not_running\", \"unknown\"\n loggedInUser: Optional[str] = None\n\n\n@dataclass\nclass SendResult:\n success: bool\n error: Optional[str] = None\n\n\nclass AgentWeChatClient:\n \"\"\"agent-wechat API 客户端\"\"\"\n \n def init(self, base_url: str, token: str):\n self.base_url = base_url.rstrip('/')\n self.token = token\n self.headers = {\n \"Content-Type\": \"application/json\",\n \"Authorization\": f\"Bearer {token}\"\n }\n self._ws_task: Optional[asyncio.Task] = None\n self._ws_close_event: Optional[asyncio.Event] = None\n \n async def _get(self, path: str) -> dict:\n async with httpx.AsyncClient() as client:\n resp = await client.get(f\"{self.base_url}{path}\", headers=self.headers)\n resp.raise_for_status()\n return resp.json()\n \n async def _post(self, path: str, data: Optional[dict] = None) -> dict:\n async with httpx.AsyncClient() as client:\n resp = await client.post(f\"{self.base_url}{path}\", json=data, headers=self.headers)\n resp.raise_for_status()\n return resp.json()\n \n # === 状态接口 ===\n async def auth_status(self) -> AuthStatus:\n data = await self._get(\"/api/status/auth\")\n return AuthStatus(\n status=data.get(\"status\", \"unknown\"),\n loggedInUser=data.get(\"loggedInUser\")\n )\n \n async def login(self) -> dict:\n return await self._post(\"/api/status/login\")\n \n async def logout(self) -> dict:\n return await self._post(\"/api/status/logout\")\n \n # === 聊天列表 ===\n async def list_chats(self, limit: int = 50) -> list[Chat]:\n data = await self._get(f\"/api/chats?limit={limit}\")\n chats = []\n for item in data.get(\"chats\", []):\n chats.append(Chat(\n id=item.get(\"id\", item.get(\"username\", \"\")),\n username=item.get(\"username\", item.get(\"id\", \"\")),\n name=item.get(\"name\", \"\"),\n unreadCount=item.get(\"unreadCount\", 0),\n lastMsgLocalId=item.get(\"lastMsgLocalId\")\n ))\n return chats\n \n async def open_chat(self, chat_id: str, clear_unreads: bool = True) -> dict:\n return await self._post(f\"/api/chats/{chat_id}/open?clearUnreads={clear_unreads}\")\n \n async def find_chats(self, name: str) -> list[Chat]:\n data = await self._get(f\"/api/chats/find?name={name}\")\n chats = []\n for item in data.get(\"chats\", []):\n chats.append(Chat(\n id=item.get(\"id\", item.get(\"username\", \"\")),\n username=item.get(\"username\", item.get(\"id\", \"\")),\n name=item.get(\"name\", \"\"),\n unreadCount=item.get(\"unreadCount\", 0)\n ))\n return chats\n \n'}
Are you willing to submit a PR? / 你愿意提交 PR 吗?
Code of Conduct
What happened / 发生了什么
Reproduce / 如何复现?
编写代码场景
AstrBot version, deployment method (e.g., Windows Docker Desktop deployment), provider used, and messaging platform used. / AstrBot 版本、部署方式(如 Windows Docker Desktop 部署)、使用的提供商、使用的消息平台适配器
astrbot 4.22.0,linux
OS
Windows
Logs / 报错日志
[2026-03-24 02:16:19.212] [Core] [WARN] [v4.22.0] [sources.anthropic_source:446]: 工具调用参数 JSON 解析失败: {'id': 'call_function_9z1fl0grtups_1', 'name': 'astrbot_execute_shell', 'input': {}, 'input_json': '{"command": "cat > /root/astrbot_plugin_agent_wechat/init.py << 'PYTHON'\n\"\"\"\nAstrBot Plugin: agent-wechat\n通过agent-wechat接入个人微信\n\"\"\"\nfrom .agent_wechat_adapter import AgentWeChatAdapter\nfrom .agent_wechat_client import AgentWeChatClient\n\n__all__ = [\"AgentWeChatAdapter\", \"AgentWeChatClient\"]\nPYTHON\n\ncat > /root/astrbot_plugin_agent_wechat/agent_wechat_client.py << 'PYTHON'\n\"\"\"\nagent-wechat REST API 客户端\n参考: https://github.com/thisnick/agent-wechat/tree/main/packages/shared\\n\\"\\"\\"\\nimport asyncio\nimport base64\nimport json\nfrom typing import Optional, Callable, Any\nfrom dataclasses import dataclass\nimport httpx\nfrom astrbot import logger\n\n\n@dataclass\nclass Chat:\n id: str\n username: str\n name: str\n unreadCount: int\n lastMsgLocalId: Optional[int] = None\n\n\n@dataclass\nclass Message:\n localId: int\n type: int\n content: str\n sender: Optional[str] = None\n senderName: Optional[str] = None\n isSelf: bool = False\n isMentioned: bool = False\n timestamp: int = 0\n reply: Optional[dict] = None\n\n\n@dataclass\nclass AuthStatus:\n status: str # \"logged_in\", \"logged_out\", \"app_not_running\", \"unknown\"\n loggedInUser: Optional[str] = None\n\n\n@dataclass\nclass SendResult:\n success: bool\n error: Optional[str] = None\n\n\nclass AgentWeChatClient:\n \"\"\"agent-wechat API 客户端\"\"\"\n \n def init(self, base_url: str, token: str):\n self.base_url = base_url.rstrip('/')\n self.token = token\n self.headers = {\n \"Content-Type\": \"application/json\",\n \"Authorization\": f\"Bearer {token}\"\n }\n self._ws_task: Optional[asyncio.Task] = None\n self._ws_close_event: Optional[asyncio.Event] = None\n \n async def _get(self, path: str) -> dict:\n async with httpx.AsyncClient() as client:\n resp = await client.get(f\"{self.base_url}{path}\", headers=self.headers)\n resp.raise_for_status()\n return resp.json()\n \n async def _post(self, path: str, data: Optional[dict] = None) -> dict:\n async with httpx.AsyncClient() as client:\n resp = await client.post(f\"{self.base_url}{path}\", json=data, headers=self.headers)\n resp.raise_for_status()\n return resp.json()\n \n # === 状态接口 ===\n async def auth_status(self) -> AuthStatus:\n data = await self._get(\"/api/status/auth\")\n return AuthStatus(\n status=data.get(\"status\", \"unknown\"),\n loggedInUser=data.get(\"loggedInUser\")\n )\n \n async def login(self) -> dict:\n return await self._post(\"/api/status/login\")\n \n async def logout(self) -> dict:\n return await self._post(\"/api/status/logout\")\n \n # === 聊天列表 ===\n async def list_chats(self, limit: int = 50) -> list[Chat]:\n data = await self._get(f\"/api/chats?limit={limit}\")\n chats = []\n for item in data.get(\"chats\", []):\n chats.append(Chat(\n id=item.get(\"id\", item.get(\"username\", \"\")),\n username=item.get(\"username\", item.get(\"id\", \"\")),\n name=item.get(\"name\", \"\"),\n unreadCount=item.get(\"unreadCount\", 0),\n lastMsgLocalId=item.get(\"lastMsgLocalId\")\n ))\n return chats\n \n async def open_chat(self, chat_id: str, clear_unreads: bool = True) -> dict:\n return await self._post(f\"/api/chats/{chat_id}/open?clearUnreads={clear_unreads}\")\n \n async def find_chats(self, name: str) -> list[Chat]:\n data = await self._get(f\"/api/chats/find?name={name}\")\n chats = []\n for item in data.get(\"chats\", []):\n chats.append(Chat(\n id=item.get(\"id\", item.get(\"username\", \"\")),\n username=item.get(\"username\", item.get(\"id\", \"\")),\n name=item.get(\"name\", \"\"),\n unreadCount=item.get(\"unreadCount\", 0)\n ))\n return chats\n \n'}
Are you willing to submit a PR? / 你愿意提交 PR 吗?
Code of Conduct