Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
302 changes: 139 additions & 163 deletions astrbot/core/agent/runners/tool_loop_agent_runner.py

Large diffs are not rendered by default.

315 changes: 283 additions & 32 deletions astrbot/core/astr_agent_tool_exec.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import asyncio
import datetime
import inspect
import json
import time
import traceback
import typing as T
import uuid
Expand Down Expand Up @@ -233,6 +235,21 @@ def _build_handoff_toolset(
toolset.add_tool(runtime_tool)
elif isinstance(tool_name_or_obj, FunctionTool):
toolset.add_tool(tool_name_or_obj)

# Always add send_shared_context tool for shared context feature
try:
from astrbot.core.dynamic_subagent_manager import (
SEND_SHARED_CONTEXT_TOOL,
DynamicSubAgentManager,
)

session_id = event.unified_msg_origin
session = DynamicSubAgentManager.get_session(session_id)
if session and session.shared_context_enabled:
toolset.add_tool(SEND_SHARED_CONTEXT_TOOL)
except Exception as e:
logger.debug(f"[EnhancedSubAgent] Failed to add shared context tool: {e}")

return None if toolset.empty() else toolset

@classmethod
Expand Down Expand Up @@ -291,21 +308,116 @@ async def _execute_handoff(
except Exception:
continue

# 获取子代理的历史上下文
agent_name = getattr(tool.agent, "name", None)
subagent_history = []
if agent_name:
try:
from astrbot.core.dynamic_subagent_manager import DynamicSubAgentManager

stored_history = DynamicSubAgentManager.get_subagent_history(
umo, agent_name
)
if stored_history:
# 将历史消息转换为 Message 对象
for hist_msg in stored_history:
try:
if isinstance(hist_msg, dict):
subagent_history.append(
Message.model_validate(hist_msg)
)
elif isinstance(hist_msg, Message):
subagent_history.append(hist_msg)
except Exception:
continue
if subagent_history:
logger.info(
f"[SubAgentHistory] Loaded {len(subagent_history)} history messages for {agent_name}"
)

except Exception as e:
logger.warning(
f"[SubAgentHistory] Failed to load history for {agent_name}: {e}"
)

prov_settings: dict = ctx.get_config(umo=umo).get("provider_settings", {})
agent_max_step = int(prov_settings.get("max_agent_step", 30))
stream = prov_settings.get("streaming_response", False)
# 如果有历史上下文,合并到 contexts 中
if subagent_history:
if contexts is None:
contexts = subagent_history
else:
contexts = subagent_history + contexts

# 构建子代理的 system_prompt,添加 skills 提示词和公共上下文
subagent_system_prompt = tool.agent.instructions or ""
if agent_name:
try:
from astrbot.core.dynamic_subagent_manager import DynamicSubAgentManager

# 注入 skills
runtime = prov_settings.get("computer_use_runtime", "local")
skills_prompt = DynamicSubAgentManager.build_subagent_skills_prompt(
umo, agent_name, runtime
)
if skills_prompt:
subagent_system_prompt += f"\n\n# Available Skills\n{skills_prompt}"
logger.info(f"[SubAgentSkills] Injected skills for {agent_name}")

# 注入公共上下文
shared_context_prompt = (
DynamicSubAgentManager.build_shared_context_prompt(umo, agent_name)
)
if shared_context_prompt:
subagent_system_prompt += f"\n{shared_context_prompt}"
logger.info(
f"[SubAgentSharedContext] Injected shared context for {agent_name}"
)

# 注入时间信息
current_time = (
datetime.datetime.now().astimezone().strftime("%Y-%m-%d %H:%M (%Z)")
)
subagent_system_prompt += f"Current datetime: {current_time}"

except Exception:
pass

llm_resp = await ctx.tool_loop_agent(
event=event,
chat_provider_id=prov_id,
prompt=input_,
image_urls=image_urls,
system_prompt=tool.agent.instructions,
system_prompt=subagent_system_prompt,
tools=toolset,
contexts=contexts,
max_steps=agent_max_step,
tool_call_timeout=run_context.tool_call_timeout,
stream=stream,
)

# 保存历史上下文
try:
from astrbot.core.dynamic_subagent_manager import DynamicSubAgentManager

agent_name = getattr(tool.agent, "name", None)
if agent_name:
# 构建当前对话的历史消息
current_messages = []
# 添加本轮用户输入
current_messages.append({"role": "user", "content": input_})
# 添加助手回复
current_messages.append(
{"role": "assistant", "content": llm_resp.completion_text}
)
if current_messages:
DynamicSubAgentManager.save_subagent_history(
umo, agent_name, current_messages
)
except Exception:
pass # 不影响主流程

yield mcp.types.CallToolResult(
content=[mcp.types.TextContent(type="text", text=llm_resp.completion_text)]
)
Expand All @@ -319,38 +431,80 @@ async def _execute_handoff_background(
):
"""Execute a handoff as a background task.

Immediately yields a success response with a task_id, then runs
the subagent asynchronously. When the subagent finishes, a
``CronMessageEvent`` is created so the main LLM can inform the
user of the result – the same pattern used by
``_execute_background`` for regular background tasks.
当启用增强SubAgent时,会在 DynamicSubAgentManager 中创建 pending 任务,
并返回 task_id 给主 Agent,以便后续通过 wait_for_subagent 获取结果。
"""
task_id = uuid.uuid4().hex
event = run_context.context.event
umo = event.unified_msg_origin
agent_name = getattr(tool.agent, "name", None)

# 生成 subagent_task_id(用于 DynamicSubAgentManager)
subagent_task_id = None

# 检查是否启用增强版 SubAgent
try:
from astrbot.core.dynamic_subagent_manager import DynamicSubAgentManager

if agent_name:
session = DynamicSubAgentManager.get_session(umo)
if session and agent_name in session.subagents:
subagent_task_id = (
DynamicSubAgentManager.create_pending_subagent_task(
session_id=umo, agent_name=agent_name
)
)
DynamicSubAgentManager.set_subagent_status(
session_id=umo,
agent_name=agent_name,
status="RUNNING",
)

logger.info(
f"[EnhancedSubAgent] Created background task {subagent_task_id} for {agent_name}"
)
except Exception as e:
logger.debug(f"[EnhancedSubAgent] Failed to create pending task: {e}")

# 生成原始的 task_id(用于唤醒机制等)
original_task_id = uuid.uuid4().hex

async def _run_handoff_in_background() -> None:
try:
await cls._do_handoff_background(
tool=tool,
run_context=run_context,
task_id=task_id,
task_id=original_task_id,
subagent_task_id=subagent_task_id,
**tool_args,
)

except Exception as e: # noqa: BLE001
logger.error(
f"Background handoff {task_id} ({tool.name}) failed: {e!s}",
f"Background handoff {original_task_id} ({tool.name}) failed: {e!s}",
exc_info=True,
)

asyncio.create_task(_run_handoff_in_background())

text_content = mcp.types.TextContent(
type="text",
text=(
f"Background task dedicated to subagent '{tool.agent.name}' submitted. task_id={task_id}. "
f"The subagent '{tool.agent.name}' is working on the task on hehalf you. "
f"You will be notified when it finishes."
),
)
# 构建返回消息
if subagent_task_id:
text_content = mcp.types.TextContent(
type="text",
text=(
f"Background task submitted. subagent_task_id={subagent_task_id}. "
f"SubAgent '{agent_name}' is working on the task. "
f"Use wait_for_subagent(subagent_name='{agent_name}', task_id='{subagent_task_id}') to get the result."
),
)
else:
text_content = mcp.types.TextContent(
type="text",
text=(
f"Background task submitted. task_id={original_task_id}. "
f"SubAgent '{agent_name}' is working on the task. "
f"You will be notified when it finishes."
),
)
yield mcp.types.CallToolResult(content=[text_content])

@classmethod
Expand All @@ -361,13 +515,24 @@ async def _do_handoff_background(
task_id: str,
**tool_args,
) -> None:
"""Run the subagent handoff and, on completion, wake the main agent."""
"""Run the subagent handoff.
当增强版 SubAgent 启用时,结果存储到 DynamicSubAgentManager,主 Agent 可通过 wait_for_subagent 获取。
否则使用原有的 _wake_main_agent_for_background_result 流程。
"""

start_time = time.time()
result_text = ""
error_text = None
tool_args = dict(tool_args)
tool_args["image_urls"] = await cls._collect_handoff_image_urls(
run_context,
tool_args.get("image_urls"),
)

event = run_context.context.event
umo = event.unified_msg_origin
agent_name = getattr(tool.agent, "name", None)

try:
async for r in cls._execute_handoff(
tool,
Expand All @@ -379,26 +544,112 @@ async def _do_handoff_background(
for content in r.content:
if isinstance(content, mcp.types.TextContent):
result_text += content.text + "\n"

except Exception as e:
error_text = str(e)
result_text = (
f"error: Background task execution failed, internal error: {e!s}"
)

event = run_context.context.event
execution_time = time.time() - start_time
success = error_text is None

await cls._wake_main_agent_for_background_result(
run_context=run_context,
task_id=task_id,
tool_name=tool.name,
result_text=result_text,
tool_args=tool_args,
note=(
event.get_extra("background_note")
or f"Background task for subagent '{tool.agent.name}' finished."
),
summary_name=f"Dedicated to subagent `{tool.agent.name}`",
extra_result_fields={"subagent_name": tool.agent.name},
)
enhanced_subagent_enabled = False
try:
from astrbot.core.dynamic_subagent_manager import DynamicSubAgentManager

session = DynamicSubAgentManager.get_session(umo)
if session and agent_name:
# 检查是否是动态创建的 SubAgent
if agent_name in session.agents:
enhanced_subagent_enabled = True
except Exception:
session = None

subagent_task_id = tool_args.get("subagent_task_id", None)

if enhanced_subagent_enabled and session and agent_name and subagent_task_id:
# 如果增强版subagent正在运行:存储结果到 DynamicSubAgentManager,使得主Agent可以访问
try:
from astrbot.core.dynamic_subagent_manager import DynamicSubAgentManager

DynamicSubAgentManager.store_subagent_result(
session_id=umo,
agent_name=agent_name,
task_id=subagent_task_id,
success=success,
result=result_text.strip() if result_text else "",
error=error_text,
execution_time=execution_time,
)
if error_text:
DynamicSubAgentManager.set_subagent_status(
session_id=umo,
agent_name=agent_name,
status="FAILED",
)
else:
DynamicSubAgentManager.set_subagent_status(
session_id=umo,
agent_name=agent_name,
status="COMPLETED",
)

# 如果启用了 shared_context,发布完成状态
if session.shared_context_enabled:
status_content = f" SubAgent '{agent_name}' 任务'{subagent_task_id}'完成,耗时 {execution_time:.1f}s"
if error_text:
status_content = f" SubAgent '{agent_name}' 任务'{subagent_task_id}' 失败: {error_text}"

DynamicSubAgentManager.add_shared_context(
session_id=umo,
sender=agent_name,
context_type="status",
content=status_content,
target="all",
)

logger.info(
f"[EnhancedSubAgent] Stored result for {agent_name} task {subagent_task_id}: "
f"success={success}, time={execution_time:.1f}s"
)

except Exception as e:
logger.error(
f"[EnhancedSubAgent] Failed to store result for {agent_name}: {e}"
)
# 存储失败时,回退到原有的唤醒机制
await cls._wake_main_agent_for_background_result(
run_context=run_context,
task_id=task_id,
tool_name=tool.name,
result_text=result_text,
tool_args=tool_args,
note=(
event.get_extra("background_note")
or f"Background task for subagent '{agent_name}' finished."
),
summary_name=f"Dedicated to subagent `{agent_name}`",
extra_result_fields={
"subagent_name": agent_name,
"subagent_task_id": subagent_task_id,
},
)
else:
# 未开启增强subagent:使用原有的唤醒机制
await cls._wake_main_agent_for_background_result(
run_context=run_context,
task_id=task_id,
tool_name=tool.name,
result_text=result_text,
tool_args=tool_args,
note=(
event.get_extra("background_note")
or f"Background task for subagent '{agent_name}' finished."
),
summary_name=f"Dedicated to subagent `{agent_name}`",
extra_result_fields={"subagent_name": agent_name},
)

@classmethod
async def _execute_background(
Expand Down
Loading