Add AG2 (formerly AutoGen) as a third AI engine (AI_ENGINE=ag2)#46
Open
VasiliyRad wants to merge 6 commits intoEvolutionAPI:mainfrom
Open
Add AG2 (formerly AutoGen) as a third AI engine (AI_ENGINE=ag2)#46VasiliyRad wants to merge 6 commits intoEvolutionAPI:mainfrom
VasiliyRad wants to merge 6 commits intoEvolutionAPI:mainfrom
Conversation
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Reviewer's GuideAdds AG2 (AutoGen/AG2) as a third AI engine, implementing an AG2-specific builder and runner, session persistence, MCP integration, and custom HTTP tools, and wiring them into the existing AI engine dispatch paths without requiring schema changes. Sequence diagram for AG2 run_agent request flowsequenceDiagram
actor User
participant ChatRoutes as chat_routes
participant ServiceProviders as service_providers
participant AG2Runner as ag2_run_agent
participant Builder as AG2AgentBuilder
participant SessionSvc as AG2SessionService
participant AG2 as AG2_ConversableAgent_or_GroupChat
participant DB as Database
User->>ChatRoutes: HTTP POST /chat (AI_ENGINE=ag2)
ChatRoutes->>ServiceProviders: resolve_engine(ag2)
ServiceProviders-->>ChatRoutes: run_agent reference
ChatRoutes->>AG2Runner: run_agent(agent_id, external_id, message, session_service, db)
AG2Runner->>DB: get_agent(agent_id)
DB-->>AG2Runner: Agent record
AG2Runner->>Builder: build_agent(root_agent)
Builder->>DB: get_agent(sub_agent_id)*
DB-->>Builder: sub_agents
Builder-->>AG2Runner: ConversableAgent or group_chat_setup
AG2Runner->>SessionSvc: get_or_create(agent_id, external_id)
SessionSvc->>DB: load_or_create AG2StorageSession
DB-->>SessionSvc: AG2StorageSession
SessionSvc-->>AG2Runner: AG2Session
AG2Runner->>SessionSvc: build_messages(session)
SessionSvc-->>AG2Runner: history
alt ag2_mode == group_chat
AG2Runner->>AG2: initiate_group_chat(pattern, history + message)
AG2-->>AG2Runner: chat_result, final_context, last_agent
else ag2_mode == single
AG2Runner->>AG2: initiate_chat(single_agent, message, history)
AG2-->>AG2Runner: chat_result
end
AG2Runner->>SessionSvc: append(session, "user", message)
AG2Runner->>SessionSvc: append(session, "assistant", final_response)
SessionSvc->>DB: update AG2StorageSession.messages
DB-->>SessionSvc: ok
AG2Runner-->>ChatRoutes: {final_response, message_history}
ChatRoutes-->>User: HTTP response with assistant message
Entity relationship diagram for new AG2StorageSession tableerDiagram
AG2StorageSession {
string app_name PK
string user_id PK
string id PK
jsonb messages
timestamp create_time
timestamp update_time
}
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
There was a problem hiding this comment.
Hey - I've found 15 issues, and left some high level feedback:
- The AG2SessionService creates its own SQLAlchemy engine and calls Base.metadata.create_all at construction time (and is instantiated inside run_agent_stream), which bypasses the app’s existing DB session/metadata lifecycle and will repeatedly run DDL; consider wiring it into the existing Session/engine infrastructure and letting migrations manage the ag2_sessions table.
- AG2StorageSession.messages is typed as MutableDict but defaults to [] and is treated as a list in get_or_create/build_messages, which is inconsistent with the column type; aligning this to a list-oriented mutable type or to a dict consistently will avoid subtle ORM/serialization issues.
- DynamicJSON lacks cache_ok = True and always JSON-serializes non-Postgres values, which can be a performance and warning source in SQLAlchemy 2.x; consider adding cache_ok = True and confirming this decorator matches the project’s existing JSON handling pattern.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- The AG2SessionService creates its own SQLAlchemy engine and calls Base.metadata.create_all at construction time (and is instantiated inside run_agent_stream), which bypasses the app’s existing DB session/metadata lifecycle and will repeatedly run DDL; consider wiring it into the existing Session/engine infrastructure and letting migrations manage the ag2_sessions table.
- AG2StorageSession.messages is typed as MutableDict but defaults to [] and is treated as a list in get_or_create/build_messages, which is inconsistent with the column type; aligning this to a list-oriented mutable type or to a dict consistently will avoid subtle ORM/serialization issues.
- DynamicJSON lacks cache_ok = True and always JSON-serializes non-Postgres values, which can be a performance and warning source in SQLAlchemy 2.x; consider adding cache_ok = True and confirming this decorator matches the project’s existing JSON handling pattern.
## Individual Comments
### Comment 1
<location path="src/services/ag2/session_service.py" line_range="95-96" />
<code_context>
+ id: Mapped[str] = mapped_column(
+ String, primary_key=True, default=lambda: str(uuid.uuid4())
+ )
+ messages: Mapped[MutableDict[str, Any]] = mapped_column(
+ MutableDict.as_mutable(DynamicJSON), default=[]
+ )
+ create_time: Mapped[DateTime] = mapped_column(DateTime(), default=func.now())
</code_context>
<issue_to_address>
**issue (bug_risk):** The `messages` column uses `MutableDict` but defaults to a list, which is inconsistent and may break SQLAlchemy change tracking.
The column is typed as `Mapped[MutableDict[str, Any]]` and uses `MutableDict.as_mutable(DynamicJSON)`, but the default is `[]`, which is a list. `MutableDict` expects a mapping and may not track changes correctly with a list value. If `messages` is meant to be a list of message dicts, switch to `MutableList` (or a custom mutable list type) and update the type hint; otherwise, use a dict default (e.g. `{}`) and treat it consistently as a mapping.
</issue_to_address>
### Comment 2
<location path="src/services/ag2/session_service.py" line_range="130-139" />
<code_context>
+ def get_or_create(self, agent_id: str, external_id: str) -> AG2Session:
</code_context>
<issue_to_address>
**suggestion (bug_risk):** `get_or_create` performs two separate DB roundtrips for the same session where one would suffice.
Inside `get_or_create`, you open one session to create/commit the `AG2StorageSession`, then a second session to fetch the same row just to load `messages`. Instead, reuse the first `record` to build `AG2Session` and skip the second `db.get`/session, which both cuts DB roundtrips and removes the race window between the two transactions.
Suggested implementation:
```python
def get_or_create(self, agent_id: str, external_id: str) -> AG2Session:
"""Retrieve an existing session or create a new one.
This implementation performs all work in a single DB session to avoid
extra roundtrips and race windows between separate transactions.
"""
session_id = f"{external_id}_{agent_id}"
with self.SessionLocal() as db:
# Single roundtrip: either fetch existing or create, then return.
record = db.get(AG2StorageSession, (agent_id, external_id, session_id))
if record is None:
record = AG2StorageSession(
app_name=agent_id,
user_id=external_id,
id=session_id,
messages=[],
)
db.add(record)
db.commit()
# Ensure we have a fully-populated, persistent instance
db.refresh(record)
# Make sure messages are loaded while the session is still open.
# If `messages` is a lazy relationship, this will trigger the load here.
_ = record.messages
# Build the AG2Session directly from the record we already have,
# instead of re-opening a new session and re-querying the same row.
return AG2Session(
agent_id=record.app_name,
external_id=record.user_id,
session_id=record.id,
messages=list(record.messages or []),
)
```
Because I only saw the beginning of `get_or_create`, you should:
1. Remove any remaining code after this method that:
- Opens a second `with self.SessionLocal() as db:` block inside `get_or_create`, or
- Re-fetches `AG2StorageSession` (e.g., `db.get(AG2StorageSession, ...)`) for the same `agent_id`/`external_id`/`session_id`.
That logic is now redundant and should be deleted.
2. If your `AG2Session` constructor has a different signature than shown here, adapt the `AG2Session(...)` call accordingly, but keep the pattern of constructing it directly from `record` in this single session.
3. If `messages` is not a relationship but a plain JSON/array column, you can drop the `_ = record.messages` line; the rest of the logic remains the same.
</issue_to_address>
### Comment 3
<location path="src/services/ag2/mcp_service.py" line_range="159-161" />
<code_context>
+ command = server_config.get("command", "npx")
+ args = server_config.get("args", [])
+ env = server_config.get("env", {})
+ if env:
+ for key, value in env.items():
+ os.environ[key] = value
+ server = McpServer({"command": command, "args": args, "env": env})
+
</code_context>
<issue_to_address>
**issue (bug_risk):** Mutating `os.environ` for MCP server env vars introduces global side effects and is likely unnecessary.
In `_connect_server`, you both build an `env` dict for `McpServer` and also write those values into `os.environ`. This leaks per-server config into the global process and can cause unexpected interactions between servers or other code. It should be enough to pass `env` into `McpServer` without modifying `os.environ`.
</issue_to_address>
### Comment 4
<location path="src/services/ag2/agent_builder.py" line_range="134-141" />
<code_context>
+ # Build all sub-agents first so handoff resolution can reference them
+ all_agents = {}
+ agents = []
+ for aid in sub_agent_ids:
+ db_agent = get_agent(self.db, str(aid))
+ if db_agent is None:
+ raise ValueError(f"Sub-agent {aid} not found")
+ ca = await self.build_conversable_agent(db_agent)
+ all_agents[str(aid)] = ca
+ agents.append(ca)
+
+ root_ca = await self.build_conversable_agent(root_agent)
+ all_agents[str(root_agent.id)] = root_ca
+
+ # Apply handoffs to each agent if configured
+ for aid in sub_agent_ids:
+ db_agent = get_agent(self.db, str(aid))
+ if db_agent and db_agent.config:
+ self._apply_handoffs(all_agents[str(aid)], db_agent.config, all_agents)
+
</code_context>
<issue_to_address>
**suggestion (performance):** Sub-agents are fetched from the database twice: once to build them and again to apply handoffs.
In `build_group_chat_setup`, each `aid` triggers two `get_agent` calls—first to build `ConversableAgent`s, then again when applying handoffs. Consider storing the initial `db_agent` objects (e.g., an id → db_agent dict) and reusing them when applying handoffs to avoid duplicate database queries and keep the lookup logic centralized.
```suggestion
config = root_agent.config or {}
sub_agent_ids = config.get("sub_agents", [])
if not sub_agent_ids:
raise ValueError("group_chat agent requires at least one sub_agent")
# Build all sub-agents first so handoff resolution can reference them
all_agents = {}
agents = []
db_sub_agents = {}
for aid in sub_agent_ids:
db_agent = get_agent(self.db, str(aid))
if db_agent is None:
raise ValueError(f"Sub-agent {aid} not found")
db_sub_agents[str(aid)] = db_agent
ca = await self.build_conversable_agent(db_agent)
all_agents[str(aid)] = ca
agents.append(ca)
root_ca = await self.build_conversable_agent(root_agent)
all_agents[str(root_agent.id)] = root_ca
# Apply handoffs to each sub-agent if configured, reusing cached db agents
for aid in sub_agent_ids:
db_agent = db_sub_agents.get(str(aid))
if db_agent and db_agent.config:
self._apply_handoffs(all_agents[str(aid)], db_agent.config, all_agents)
```
</issue_to_address>
### Comment 5
<location path="src/services/ag2/agent_builder.py" line_range="94-116" />
<code_context>
+ logger.warning(f"Handoff target {target_id} not found, skipping")
+ continue
+
+ if h["type"] == "llm":
+ llm_conditions.append(
+ OnCondition(
+ target=AgentTarget(target_agent),
+ condition=StringLLMCondition(prompt=h["condition"]),
+ )
+ )
+ elif h["type"] == "context":
+ context_conditions.append(
+ OnContextCondition(
</code_context>
<issue_to_address>
**suggestion:** Using `h["type"]` without validation can raise if a handoff entry is malformed.
In `_apply_handoffs`, this direct `h["type"]` access will raise a `KeyError` if a handoff entry is missing `type`, which will break setup for a misconfigured user config. Consider using `h.get("type")` and skipping/logging invalid entries, consistent with how missing `target_agent` is handled.
```suggestion
for h in handoffs_config:
target_id = h.get("target_agent_id")
target_agent = all_agents.get(str(target_id))
if not target_agent:
logger.warning(f"Handoff target {target_id} not found, skipping")
continue
h_type = h.get("type")
if h_type not in ("llm", "context"):
logger.warning(
f"Invalid or missing handoff type {h_type!r} for target {target_id}, skipping"
)
continue
if h_type == "llm":
llm_conditions.append(
OnCondition(
target=AgentTarget(target_agent),
condition=StringLLMCondition(prompt=h["condition"]),
)
)
elif h_type == "context":
context_conditions.append(
OnContextCondition(
target=AgentTarget(target_agent),
condition=ExpressionContextCondition(
expression=ContextExpression(h["expression"])
),
)
)
```
</issue_to_address>
### Comment 6
<location path="src/services/ag2/custom_tool.py" line_range="109-116" />
<code_context>
+ ):
+ body_data[param] = value
+
+ response = requests.request(
+ method=method,
+ url=url,
+ headers=processed_headers,
+ params=query_params_dict,
+ json=body_data or None,
+ timeout=error_handling.get("timeout", 30),
+ )
+
+ if response.status_code >= 400:
+ raise requests.exceptions.HTTPError(
+ f"Error in the request: {response.status_code} - {response.text}"
+ )
+
+ return json.dumps(response.json())
+
+ except Exception as e:
</code_context>
<issue_to_address>
**suggestion (bug_risk):** Assuming all HTTP responses are JSON can cause tool failures for non‑JSON endpoints.
`response.json()` is called unconditionally, so plain text/HTML responses (including some error bodies) will raise and drop you into the generic `Exception` handler, obscuring the real error. Wrap `response.json()` in a `try/except ValueError` and fall back to `response.text` or a consistent JSON wrapper so non‑JSON responses are handled gracefully.
```suggestion
if response.status_code >= 400:
raise requests.exceptions.HTTPError(
f"Error in the request: {response.status_code} - {response.text}"
)
try:
response_data = response.json()
except ValueError:
# Non-JSON response; fall back to a consistent JSON wrapper
response_data = {
"status_code": response.status_code,
"headers": dict(response.headers),
"raw_response": response.text,
}
return json.dumps(response_data)
except Exception as e:
```
</issue_to_address>
### Comment 7
<location path="src/services/ag2/agent_runner.py" line_range="46-50" />
<code_context>
+ try:
+ ag2_mode = (db_agent.config or {}).get("ag2_mode", "single")
+ if ag2_mode == "group_chat":
+ chat_result, final_context, last_agent = initiate_group_chat(
+ pattern=result["pattern"],
+ messages=history + [message],
+ max_rounds=result["max_rounds"],
+ context_variables=result["context_variables"],
+ )
+ final_response = chat_result.summary or (
</code_context>
<issue_to_address>
**issue (bug_risk):** Group chat `messages` mixes dict history with a raw string for the new message, which may not match AG2’s expected format.
Here `messages` is built as `history + [message]`, mixing `{role, content}` dicts with a plain string. If `initiate_group_chat` expects a uniform list of message dicts, this could break routing/formatting. Consider wrapping `message` as `{"role": "user", "content": message}` to keep the structure consistent.
</issue_to_address>
### Comment 8
<location path="src/services/ag2/agent_runner.py" line_range="24" />
<code_context>
+ session_service: AG2SessionService,
+ db: Session,
+ session_id: Optional[str] = None,
+ timeout: float = 60.0,
+ files: Optional[list] = None,
+) -> dict:
</code_context>
<issue_to_address>
**nitpick:** The `timeout` parameter on `run_agent` is currently unused.
Since `timeout` isn’t used anywhere in the implementation, the signature is misleading. Please either apply it to the underlying execution (e.g., wrap the blocking call in `asyncio.wait_for`) or remove the parameter.
</issue_to_address>
### Comment 9
<location path="src/services/ag2/agent_runner.py" line_range="21" />
<code_context>
+ agent_id: str,
+ external_id: str,
+ message: str,
+ session_service: AG2SessionService,
+ db: Session,
+ session_id: Optional[str] = None,
</code_context>
<issue_to_address>
**question (bug_risk):** Session IDs are accepted but not used, so distinct sessions for the same agent/user cannot be differentiated.
`run_agent` and `run_agent_stream` accept `session_id`, but `AG2SessionService.get_or_create` always uses `f"{external_id}_{agent_id}"`, so concurrent sessions for the same agent/user are merged. If multiple sessions are intended, include `session_id` in the session key; if not, consider removing `session_id` from the API to avoid confusion.
</issue_to_address>
### Comment 10
<location path="src/services/ag2/agent_runner.py" line_range="8-17" />
<code_context>
+ Token-level streaming can be added in a future iteration by wiring
+ ConversableAgent's `process_last_received_message` hook to a queue.
+ """
+ from src.services.ag2.session_service import AG2SessionService
+ from src.config.settings import get_settings
+ settings = get_settings()
+ session_service = AG2SessionService(db_url=settings.POSTGRES_CONNECTION_STRING)
+
+ result = await run_agent(
</code_context>
<issue_to_address>
**suggestion (performance):** Creating a new `AG2SessionService` (and engine) on every `run_agent_stream` call is potentially expensive.
In `run_agent_stream`, each request creates a new `AG2SessionService`, which builds a new SQLAlchemy engine and metadata. Consider reusing a shared `AG2SessionService` or at least a shared engine/session factory to avoid repeated DB setup and reduce overhead under load.
</issue_to_address>
### Comment 11
<location path="tests/services/ag2/test_agent_builder.py" line_range="40-49" />
<code_context>
+ }
+ return a
+
+@pytest.mark.asyncio
+async def test_builder_creates_group_chat(mock_db, group_chat_agent_record):
+ builder = AG2AgentBuilder(db=mock_db)
+ sub_agent = _make_agent("uuid-triage", "triage")
+ with patch("src.services.ag2.agent_builder.get_agent", return_value=sub_agent):
+ result, _ = await builder.build_agent(group_chat_agent_record)
+ # result should be a dict with pattern and agents for initiate_group_chat
+ assert "pattern" in result
+ assert "agents" in result
+
+@pytest.mark.asyncio
+async def test_builder_validates_group_chat_requires_agents(mock_db):
+ record = MagicMock()
+ record.type = "llm"
+ record.config = {"ag2_mode": "group_chat", "sub_agents": []} # empty — should raise
+ record.api_key = "test"
+ builder = AG2AgentBuilder(db=mock_db)
+ with pytest.raises(ValueError, match="at least one"):
+ await builder.build_agent(record)
</code_context>
<issue_to_address>
**suggestion (testing):** Add tests for single-agent mode and API key resolution branches in AG2AgentBuilder
Current tests only cover group chat orchestration and the empty-sub-agent validation. Please also add coverage for:
- `single` mode: `build_agent` returning a `ConversableAgent` when `ag2_mode` is unset or set to `"single"`.
- Agent metadata: name sanitization (spaces → underscores) and correct system message composition from `role`, `goal`, and `instruction`.
- `_get_api_key` behavior:
- `api_key_id` provided and resolved via `get_decrypted_api_key`.
- `config.api_key` as a UUID string resolved via `get_decrypted_api_key`.
- `config.api_key` as a non-UUID raw key returned as-is.
- No valid key → `ValueError` with a clear message.
These cases help ensure behavior stays aligned with ADK/CrewAI builders and that auth errors are surfaced properly.
Suggested implementation:
```python
@pytest.mark.asyncio
async def test_builder_creates_group_chat(mock_db, group_chat_agent_record):
builder = AG2AgentBuilder(db=mock_db)
sub_agent = _make_agent("uuid-triage", "triage")
with patch("src.services.ag2.agent_builder.get_agent", return_value=sub_agent):
result, _ = await builder.build_agent(group_chat_agent_record)
# result should be a dict with pattern and agents for initiate_group_chat
assert "pattern" in result
assert "agents" in result
@pytest.mark.asyncio
async def test_builder_validates_group_chat_requires_agents(mock_db):
record = MagicMock()
record.type = "llm"
record.config = {"ag2_mode": "group_chat", "sub_agents": []} # empty — should raise
record.api_key = "test"
builder = AG2AgentBuilder(db=mock_db)
with pytest.raises(ValueError, match="at least one"):
await builder.build_agent(record)
@pytest.mark.asyncio
async def test_builder_creates_single_agent_when_mode_unset(mock_db):
record = MagicMock()
record.type = "llm"
record.name = "Support Team Agent"
record.api_key_id = None
record.api_key = None
record.config = {
# no ag2_mode: should default to single-agent mode
"role": "assistant",
"goal": "Help users resolve issues",
"instruction": "Be concise and friendly.",
"api_key": "raw-openai-key",
}
builder = AG2AgentBuilder(db=mock_db)
agent, metadata = await builder.build_agent(record)
# single-agent mode should produce a ConversableAgent-like object
from autogen import ConversableAgent # type: ignore[import]
assert isinstance(agent, ConversableAgent)
# name should be sanitized (spaces -> underscores)
assert agent.name == "Support_Team_Agent"
# system message should compose role, goal and instruction
system_message = getattr(agent, "system_message", "")
assert "assistant" in system_message
assert "Help users resolve issues" in system_message
assert "Be concise and friendly." in system_message
# metadata should at least echo the mode
assert metadata.get("mode") == "single"
@pytest.mark.asyncio
async def test_builder_creates_single_agent_when_mode_single(mock_db):
record = MagicMock()
record.type = "llm"
record.name = "Billing Bot"
record.api_key_id = None
record.api_key = None
record.config = {
"ag2_mode": "single",
"role": "assistant",
"goal": "Assist with billing questions",
"instruction": "Only answer billing-related queries.",
"api_key": "raw-openai-key",
}
builder = AG2AgentBuilder(db=mock_db)
agent, metadata = await builder.build_agent(record)
from autogen import ConversableAgent # type: ignore[import]
assert isinstance(agent, ConversableAgent)
assert agent.name == "Billing_Bot"
system_message = getattr(agent, "system_message", "")
assert "billing" in system_message.lower()
assert "only answer billing-related queries.".lower() in system_message.lower()
assert metadata.get("mode") == "single"
def test_get_api_key_with_api_key_id_uses_secret_store(mock_db):
builder = AG2AgentBuilder(db=mock_db)
record = MagicMock()
record.api_key_id = "11111111-2222-3333-4444-555555555555"
record.api_key = None
record.config = {}
with patch(
"src.services.ag2.agent_builder.get_decrypted_api_key",
return_value="resolved-secret-key",
) as mock_get:
api_key = builder._get_api_key(record)
mock_get.assert_called_once_with(mock_db, record.api_key_id)
assert api_key == "resolved-secret-key"
def test_get_api_key_with_uuid_string_in_config(mock_db):
builder = AG2AgentBuilder(db=mock_db)
config_uuid = "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee"
record = MagicMock()
record.api_key_id = None
record.api_key = None
record.config = {"api_key": config_uuid}
with patch(
"src.services.ag2.agent_builder.get_decrypted_api_key",
return_value="resolved-from-config",
) as mock_get:
api_key = builder._get_api_key(record)
mock_get.assert_called_once_with(mock_db, config_uuid)
assert api_key == "resolved-from-config"
def test_get_api_key_with_raw_non_uuid_config_value(mock_db):
builder = AG2AgentBuilder(db=mock_db)
record = MagicMock()
record.api_key_id = None
record.api_key = None
record.config = {"api_key": "sk-live-plain-openai-key"}
api_key = builder._get_api_key(record)
# Non-UUID api_key should be returned as-is, without hitting secret store
assert api_key == "sk-live-plain-openai-key"
def test_get_api_key_raises_when_no_valid_key(mock_db):
builder = AG2AgentBuilder(db=mock_db)
record = MagicMock()
record.api_key_id = None
record.api_key = None
record.config = {}
with pytest.raises(ValueError, match="API key"):
builder._get_api_key(record)
```
1. Ensure `ConversableAgent` is available at the top of the test module; if you prefer not to import it inside tests, move the `from autogen import ConversableAgent` import to the global imports section and remove the inline imports in the tests.
2. Verify the call signature of `get_decrypted_api_key` in `src.services.ag2.agent_builder`; the tests assume `get_decrypted_api_key(db, api_key_id)`. If the actual order differs, adjust the `assert_called_once_with` expectations accordingly.
3. If `AG2AgentBuilder._get_api_key` currently uses a different error message when no key is found, update the implementation to raise `ValueError("API key could not be resolved")` (or similar) so that the `"API key"` substring in the `match` expression is present.
4. If the `ConversableAgent` implementation uses a different attribute than `.system_message` for the composed instructions, either:
- Update `AG2AgentBuilder` to set `.system_message` to the composed text, or
- Adjust the tests to assert against the actual attribute used (e.g. `agent.description`), keeping the checks that `role`, `goal`, and `instruction` are all present.
</issue_to_address>
### Comment 12
<location path="tests/services/ag2/test_agent_builder.py" line_range="6-15" />
<code_context>
+from unittest.mock import MagicMock, AsyncMock, patch
+from src.services.ag2.agent_builder import AG2AgentBuilder
+
+@pytest.fixture
+def mock_db():
+ return AsyncMock()
+
+def _make_agent(agent_id, agent_name, ag2_mode="single", model="gpt-4o-mini"):
+ """Create a minimal agent record mock with explicit attribute assignment."""
+ a = MagicMock()
+ a.id = agent_id
+ a.name = agent_name
+ a.type = "llm"
+ a.llm_provider = "openai"
+ a.model = model
+ a.config = {"ag2_mode": ag2_mode, "api_key": "test-key"}
+ a.api_key = "test-key"
+ a.api_key_id = None
+ a.api_url = "https://api.openai.com/v1"
+ a.role = None
+ a.goal = None
+ a.instruction = f"You are {agent_name}."
+ return a
+
+@pytest.fixture
</code_context>
<issue_to_address>
**suggestion (testing):** Extend tests to cover handoff configuration and sub-agent resolution in group chat setups
`AG2AgentBuilder` has substantial logic in `_apply_handoffs` and `build_group_chat_setup` (LLM/context handoffs, `after_work` mapping, and missing sub-agent handling), but current tests only check the output shape.
Please add tests that:
- Build a root agent with sub-agents containing `handoffs` of type `"llm"` and `"context"`, and assert the expected conditions are registered on `ca.handoffs`.
- Use a sub-agent ID for which `get_agent` returns `None` and assert a `ValueError("Sub-agent ... not found")`.
- Verify `after_work` resolves to `TerminateTarget` when set to `"terminate"` and to `RevertToUserTarget` otherwise.
You can rely on lightweight `MagicMock` agents or patched AG2 classes to avoid heavy dependencies while still validating the behavior and wiring.
Suggested implementation:
```python
from unittest.mock import MagicMock, AsyncMock, patch
from src.services.ag2.agent_builder import AG2AgentBuilder, TerminateTarget, RevertToUserTarget
@pytest.fixture
def mock_db():
return AsyncMock()
def _make_agent(agent_id, agent_name, ag2_mode="single", model="gpt-4o-mini"):
"""Create a minimal agent record mock with explicit attribute assignment."""
a = MagicMock()
a.id = agent_id
a.name = agent_name
a.type = "llm"
a.llm_provider = "openai"
a.model = model
a.config = {"ag2_mode": ag2_mode, "api_key": "test-key"}
a.api_key = "test-key"
a.api_key_id = None
a.api_url = "https://api.openai.com/v1"
a.role = None
a.goal = None
a.instruction = f"You are {agent_name}."
return a
@pytest.fixture
def root_agent_with_handoffs():
"""Root agent configured with LLM and context handoffs to sub-agents."""
root = _make_agent("root", "Root Agent", ag2_mode="group")
# Two handoffs: one LLM and one context, with different after_work semantics
root.config["handoffs"] = [
{
"type": "llm",
"to": "sub-llm",
"condition": {"kind": "tool", "name": "delegate_to_sub_llm"},
"after_work": "terminate",
},
{
"type": "context",
"to": "sub-context",
"condition": {"kind": "message", "contains": "use context"},
"after_work": "return",
},
]
return root
@pytest.fixture
def sub_agents():
"""Sub-agents referenced by the root agent's handoffs."""
sub_llm = _make_agent("sub-llm", "Sub LLM Agent")
sub_ctx = _make_agent("sub-context", "Sub Context Agent")
return {"sub-llm": sub_llm, "sub-context": sub_ctx}
@pytest.mark.asyncio
async def test_apply_handoffs_registers_llm_and_context_handoffs(
mock_db, root_agent_with_handoffs, sub_agents
):
"""
Ensure LLM and context handoffs are transformed and registered on ca.handoffs.
This exercises AG2AgentBuilder.build_group_chat_setup and its internal
_apply_handoffs logic without pulling in heavy AG2 dependencies.
"""
builder = AG2AgentBuilder(mock_db)
# Patch get_agent so handoff resolution finds our lightweight MagicMock sub-agents.
async def fake_get_agent(agent_id: str):
return sub_agents.get(agent_id)
with patch.object(builder, "get_agent", side_effect=fake_get_agent):
# Depending on implementation, the entry point may differ.
# We assume build_group_chat_setup(root_agent) returns an object with .handoffs.
ca = await builder.build_group_chat_setup(root_agent_with_handoffs)
# We expect one handoff per entry in root_agent.config["handoffs"].
assert hasattr(ca, "handoffs")
assert len(ca.handoffs) == 2
# Order is preserved: first is LLM handoff, second is context handoff.
llm_handoff, ctx_handoff = ca.handoffs
# The tests are intentionally lightweight: we validate the wiring rather than AG2 internals.
# LLM handoff should target the sub-llm agent and have a condition based on tool usage.
assert getattr(llm_handoff, "sub_agent_id", None) == "sub-llm"
assert getattr(llm_handoff, "type", None) == "llm"
assert "delegate_to_sub_llm" in repr(getattr(llm_handoff, "condition", ""))
# Context handoff should target the sub-context agent and have a message-based condition.
assert getattr(ctx_handoff, "sub_agent_id", None) == "sub-context"
assert getattr(ctx_handoff, "type", None) == "context"
assert "use context" in repr(getattr(ctx_handoff, "condition", ""))
@pytest.mark.asyncio
async def test_apply_handoffs_missing_sub_agent_raises_value_error(
mock_db, root_agent_with_handoffs
):
"""
When get_agent returns None for a referenced sub-agent, a ValueError is raised.
"""
builder = AG2AgentBuilder(mock_db)
# Force one of the handoffs to reference a non-existent sub-agent.
root_agent_with_handoffs.config["handoffs"][0]["to"] = "missing-subagent-id"
async def fake_get_agent(agent_id: str):
# Simulate not-found for the missing ID, normal behavior otherwise.
if agent_id == "missing-subagent-id":
return None
return _make_agent(agent_id, f"Agent {agent_id}")
with patch.object(builder, "get_agent", side_effect=fake_get_agent):
with pytest.raises(ValueError, match=r"Sub-agent .* not found"):
await builder.build_group_chat_setup(root_agent_with_handoffs)
@pytest.mark.asyncio
async def test_after_work_resolves_to_correct_targets(
mock_db, root_agent_with_handoffs, sub_agents
):
"""
Verify that after_work is mapped to TerminateTarget and RevertToUserTarget.
- "terminate" -> TerminateTarget
- any other value (e.g. "return") -> RevertToUserTarget
"""
builder = AG2AgentBuilder(mock_db)
async def fake_get_agent(agent_id: str):
return sub_agents.get(agent_id)
with patch.object(builder, "get_agent", side_effect=fake_get_agent):
ca = await builder.build_group_chat_setup(root_agent_with_handoffs)
assert hasattr(ca, "handoffs")
assert len(ca.handoffs) == 2
llm_handoff, ctx_handoff = ca.handoffs
# after_work was set to "terminate" for the LLM handoff.
after_work_llm = getattr(llm_handoff, "after_work", None)
assert isinstance(after_work_llm, TerminateTarget)
# after_work was set to "return" for the context handoff.
after_work_ctx = getattr(ctx_handoff, "after_work", None)
assert isinstance(after_work_ctx, RevertToUserTarget)
```
These tests assume the following public/semipublic API on `AG2AgentBuilder` and related classes:
1. `AG2AgentBuilder` has an async method `build_group_chat_setup(root_agent)` returning an object (here called `ca`) that exposes:
- `ca.handoffs`: an ordered iterable of handoff objects.
2. Each handoff object has (or can be extended to have) the attributes:
- `type`: `"llm"` or `"context"`.
- `sub_agent_id`: the string ID of the sub-agent resolved from `config["handoffs"][i]["to"]`.
- `condition`: an object whose `repr` contains at least the identifying text from the `condition` dict in the config.
- `after_work`: an instance of either `TerminateTarget` or `RevertToUserTarget`.
3. `AG2AgentBuilder` exposes an async method `get_agent(agent_id: str)` used by `_apply_handoffs` to resolve sub-agents.
4. `src.services.ag2.agent_builder` exports `TerminateTarget` and `RevertToUserTarget`.
If any of these assumptions differ from your actual implementation, you should:
- Update the test calls (`build_group_chat_setup`, attribute names on the returned object, etc.) to match your real API.
- Adjust the assertions on the handoff objects to match the actual attributes you expose (for example, you may store the sub-agent as `handoff.agent` instead of `sub_agent_id`; in that case assert on `handoff.agent.id`).
- If `TerminateTarget` / `RevertToUserTarget` are not directly exported, either import them from the correct module or assert based on whatever target representation your implementation uses (e.g., string enums like `"terminate"` / `"user"`).
</issue_to_address>
### Comment 13
<location path="tests/services/ag2/test_agent_runner.py" line_range="7-16" />
<code_context>
+ }
+ return a
+
+@pytest.mark.asyncio
+async def test_builder_creates_group_chat(mock_db, group_chat_agent_record):
+ builder = AG2AgentBuilder(db=mock_db)
</code_context>
<issue_to_address>
**suggestion (testing):** Add tests for error conditions and non-happy-path behavior in `run_agent_stream` and `run_agent`
Please also cover:
- A case where `run_agent` raises (e.g., `InternalServerError`) and `run_agent_stream` propagates the exception instead of yielding.
- Direct `run_agent` tests for:
- `get_agent` returning `None` → `AgentNotFoundError`.
- `ag2_mode == "group_chat"` with `initiate_group_chat` patched, asserting correct use of `chat_result.summary` and `chat_history`.
- `ag2_mode == "single"` with `ConversableAgent` patched, asserting the proxy pattern and that the final response comes from the last message.
This will exercise the control flow and error mapping beyond the happy-path streaming case.
Suggested implementation:
```python
import json
import pytest
from unittest.mock import MagicMock, AsyncMock, patch
from src.exceptions import InternalServerError
from src.services.ag2.agent_runner import run_agent_stream, run_agent, AgentNotFoundError
```
```python
assert data["content"]["parts"][0]["text"] == "Resolved: your issue is fixed."
assert data["is_final"] is True
@pytest.mark.asyncio
async def test_run_agent_stream_propagates_internal_server_error():
mock_settings = MagicMock()
mock_settings.POSTGRES_CONNECTION_STRING = "sqlite://"
# Patch run_agent to raise, and still patch session service + settings as in happy-path test
with patch(
"src.services.ag2.agent_runner.run_agent",
AsyncMock(side_effect=InternalServerError("boom")),
):
with patch("src.services.ag2.session_service.AG2SessionService"):
with patch("src.config.settings.get_settings", return_value=mock_settings):
with pytest.raises(InternalServerError):
async for _ in run_agent_stream(
db=AsyncMock(),
agent_id="agent-123",
external_id="ext-123",
session_id="session-abc",
message="This will fail",
):
# If we ever yield, the behavior is wrong
pytest.fail("run_agent_stream should not yield on InternalServerError")
@pytest.mark.asyncio
async def test_run_agent_get_agent_none_raises_agent_not_found():
mock_db = AsyncMock()
with patch("src.services.ag2.agent_runner.AG2AgentBuilder") as builder_cls:
builder_instance = builder_cls.return_value
builder_instance.get_agent.return_value = None
with pytest.raises(AgentNotFoundError):
await run_agent(
db=mock_db,
agent_id="missing-agent",
external_id="ext-123",
session_id="session-abc",
message="Hello?",
)
@pytest.mark.asyncio
async def test_run_agent_group_chat_uses_summary_and_chat_history():
mock_db = AsyncMock()
# Fake agent record in group_chat mode
agent_record = MagicMock()
agent_record.ag2_mode = "group_chat"
chat_result = MagicMock()
chat_result.summary = "Group chat resolution summary"
chat_result.chat_history = [
{"role": "user", "content": "Question"},
{"role": "assistant", "content": "Intermediate"},
{"role": "assistant", "content": "Final"},
]
with patch("src.services.ag2.agent_runner.AG2AgentBuilder") as builder_cls:
builder_instance = builder_cls.return_value
builder_instance.get_agent.return_value = agent_record
with patch(
"src.services.ag2.agent_runner.initiate_group_chat",
AsyncMock(return_value=chat_result),
):
result = await run_agent(
db=mock_db,
agent_id="group-agent",
external_id="ext-123",
session_id="session-abc",
message="Start group chat",
)
assert result["final_response"] == chat_result.summary
assert result["message_history"] == chat_result.chat_history
@pytest.mark.asyncio
async def test_run_agent_single_mode_uses_proxy_and_last_message_response():
mock_db = AsyncMock()
# Fake agent record in single mode
agent_record = MagicMock()
agent_record.ag2_mode = "single"
# Simulated conversation messages; run_agent should use the last one as final response
conversation_messages = [
{"role": "user", "content": "Hi"},
{"role": "assistant", "content": "First reply"},
{"role": "assistant", "content": "Final reply from proxy"},
]
with patch("src.services.ag2.agent_runner.AG2AgentBuilder") as builder_cls:
builder_instance = builder_cls.return_value
builder_instance.get_agent.return_value = agent_record
# Mock ConversableAgent to simulate proxy pattern and returned messages
with patch("src.services.ag2.agent_runner.ConversableAgent") as ConversableAgentMock:
# Each instantiation returns a separate agent mock
primary_agent = MagicMock()
proxy_agent = MagicMock()
# The proxy agent produces the conversation messages
proxy_agent.initiate_chat.return_value = conversation_messages
ConversableAgentMock.side_effect = [primary_agent, proxy_agent]
result = await run_agent(
db=mock_db,
agent_id="single-agent",
external_id="ext-123",
session_id="session-abc",
message="Talk to me",
)
# Assert that we used the proxy pattern (two agents created)
assert ConversableAgentMock.call_count == 2
# Final response should come from the last message produced by the proxy conversation
assert result["final_response"] == conversation_messages[-1]["content"]
```
These tests assume:
1. `InternalServerError` is available as `src.exceptions.InternalServerError`. If it lives elsewhere, update the import path accordingly.
2. `AgentNotFoundError` is exported from `src.services.ag2.agent_runner`. If it is defined in a different module, adjust the import and the `pytest.raises(...)` target.
3. `run_agent`:
- Accepts the arguments `(db, agent_id, external_id, session_id, message, ...)` as used here.
- Raises `AgentNotFoundError` when `AG2AgentBuilder(...).get_agent(...)` returns `None`.
- In `"group_chat"` mode calls `initiate_group_chat(...)` and returns a dict with `"final_response"` (from `chat_result.summary`) and `"message_history"` (from `chat_result.chat_history`).
- In `"single"` mode constructs two `ConversableAgent` instances (primary + proxy), uses the proxy's `initiate_chat` result, and sets `"final_response"` from the last message's `"content"`.
If your actual control flow differs (e.g., different attribute names, return shapes, or proxy setup), adjust the assertions and mocks (especially the `side_effect` on `ConversableAgentMock` and the expected keys in `result`) to match the real implementation.
</issue_to_address>
### Comment 14
<location path="tests/services/ag2/test_agent_runner.py" line_range="18-27" />
<code_context>
+
+ # Patch run_agent (the heavy work), session service, and settings —
+ # get_settings and AG2SessionService are local imports inside run_agent_stream
+ with patch("src.services.ag2.agent_runner.run_agent", AsyncMock(return_value=mock_result)):
+ with patch("src.services.ag2.session_service.AG2SessionService"):
+ with patch("src.config.settings.get_settings", return_value=mock_settings):
+ chunks = []
+ async for chunk in run_agent_stream(
+ db=AsyncMock(),
+ agent_id="agent-123",
+ external_id="ext-123",
+ session_id="session-abc",
+ message="My printer is broken",
+ ):
+ chunks.append(chunk)
+
+ assert len(chunks) >= 1
</code_context>
<issue_to_address>
**suggestion (testing):** Consider asserting the full streaming envelope contract, not just the text content
Since `run_agent_stream` should match the WebSocket/ADK streaming envelope, this test should also validate the JSON schema, e.g.:
- `data["content"]["role"] == "agent"`
- `data["author"]` matches the `agent_id` argument
- `parts` elements have the expected `{ "type": "text" }` shape
This will better enforce the streaming protocol contract and detect unintended schema changes.
</issue_to_address>
### Comment 15
<location path="tests/services/ag2/test_agent_runner.py" line_range="13-20" />
<code_context>
+ "final_response": "Resolved: your issue is fixed.",
+ "message_history": [],
+ }
+ mock_settings = MagicMock()
+ mock_settings.POSTGRES_CONNECTION_STRING = "sqlite://"
+
+ # Patch run_agent (the heavy work), session service, and settings —
+ # get_settings and AG2SessionService are local imports inside run_agent_stream
+ with patch("src.services.ag2.agent_runner.run_agent", AsyncMock(return_value=mock_result)):
+ with patch("src.services.ag2.session_service.AG2SessionService"):
+ with patch("src.config.settings.get_settings", return_value=mock_settings):
+ chunks = []
+ async for chunk in run_agent_stream(
</code_context>
<issue_to_address>
**suggestion (testing):** Add a dedicated test for session history integration in `run_agent`
Since `AG2SessionService` is fully stubbed, we don’t currently verify that messages are appended and persisted. Please add a focused unit test for `run_agent` that injects a fake `AG2SessionService` with `get_or_create`, `append`, and `save` mocked, and asserts that:
- `append` is called in order with `("user", message)` then `("assistant", final_response)`
- `save` is called once with the same session instance
This will ensure conversational state is correctly persisted for the WhatsApp pause/resume scenario described in the PR.
Suggested implementation:
```python
import json
import pytest
from unittest.mock import MagicMock, AsyncMock, patch, call
from src.services.ag2.agent_runner import run_agent_stream, run_agent
```
```python
assert len(chunks) >= 1
data = json.loads(chunks[0])
@pytest.mark.asyncio
async def test_run_agent_persists_session_history_in_order():
mock_settings = MagicMock()
mock_settings.POSTGRES_CONNECTION_STRING = "sqlite://"
with patch("src.services.ag2.session_service.AG2SessionService") as MockSessionService, \
patch("src.config.settings.get_settings", return_value=mock_settings):
mock_session_service = MockSessionService.return_value
mock_session = MagicMock()
# Ensure session service methods are async and controllable
mock_session_service.get_or_create = AsyncMock(return_value=mock_session)
mock_session_service.append = AsyncMock()
mock_session_service.save = AsyncMock()
db = AsyncMock()
user_message = "My printer is broken"
result = await run_agent(
db=db,
agent_id="agent-123",
external_id="ext-123",
session_id="session-abc",
message=user_message,
)
# Append is called in order: user then assistant
mock_session_service.append.assert_has_awaits(
[
call(mock_session, "user", user_message),
call(mock_session, "assistant", result["final_response"]),
]
)
# Save is called once with the same session instance
mock_session_service.save.assert_awaited_once_with(mock_session)
```
If the actual signature of `run_agent` or the `AG2SessionService` methods differ (for example, if `append`/`save` or `get_or_create` are synchronous instead of async, or if `append` takes different parameters), you should:
1. Adjust the `AsyncMock` usage to `MagicMock` and switch from `assert_awaited*` to `assert_called*` accordingly.
2. Update the `call(...)` argument lists to match the real method signatures (e.g., if `append` receives keyword arguments or a different ordering).
3. If `run_agent` requires additional parameters (such as config, tools, or agent options), pass suitable dummy values in the test invocation.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- session_service: use MutableList (not MutableDict), add cache_ok=True,
fix mutable default (lambda: []), merge double DB roundtrip into one
session, propagate session_id through get_or_create
- agent_builder: safe handoff type access via h.get("type") with skip/log
on unknown values; cache db_sub_agents to eliminate N+1 DB queries
- agent_runner: fix group_chat message format (dict, not bare string),
propagate session_id; run_agent_stream now accepts session_service as
parameter instead of creating it per-request
- chat_routes: pass session_service= to run_agent_stream_ag2 call
- mcp_service: remove redundant os.environ mutation and unused import os
- custom_tool: wrap response.json() in try/except ValueError for non-JSON
HTTP responses
- tests: expand agent_builder tests to 10 cases (single mode, default mode,
name sanitization, system message, _get_api_key branches, _apply_handoffs
validation); add envelope contract assertions and session history ordering
test to agent_runner tests
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- _apply_handoffs: test LLM condition registration, context condition registration, after_work=terminate (TerminateTarget), and default after_work (RevertToUserTarget) - build_group_chat_setup: test ValueError raised on missing sub-agent - run_agent: test AgentNotFoundError when agent DB record is missing - run_agent: test RuntimeError from initiate_group_chat wrapped as InternalServerError - run_agent_stream: test that InternalServerError from run_agent propagates Addresses reviewer comments 12 and 13 on PR EvolutionAPI#46. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Author
|
@davidson-gomes, could you please review this change? It adds support for one more agentic framework, AG2. All comments from sourcery-ai are addressed. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Implements
src/services/ag2/following the same two-file builder+runner pattern used by the existing CrewAI and Google ADK engines. AddsAG2AgentBuilder(translates evo-ai Agent DB records to AG2 agents) andAG2AgentRunner(run_agent + run_agent_stream via chat_routes). Updateschat_routes.pyandservice_providers.pydispatch branches. No schema migration required: orchestration mode is stored in the existingconfigJSON column (config.ag2_mode), following the same pattern used by the existing engine integrations.Why: evo-ai currently supports Google ADK (single-agent) and CrewAI (sequential crew), but has no conversational multi-agent runtime. AG2's
GroupChatwith LLM-driven speaker selection is architecturally distinct from both: agents collaborate dynamically without a fixed execution order. The most distinctive capability isWhatsAppSuspendableProxy— AG2's human-in-the-loop mechanism maps directly to evo-ai's WhatsApp integration, enabling stateful pause-and-resume across HTTP requests, a pattern that no other engine in the platform currently supports. The integration follows the existing engine pattern exactly;chat_routes.pyalready has theAI_ENGINEdispatch structure, making the diff minimal and the review scope clear.