diff --git a/src/microsoft/opentelemetry/_agent_framework/_utils.py b/src/microsoft/opentelemetry/_agent_framework/_utils.py index f40f74ea..8a770e85 100644 --- a/src/microsoft/opentelemetry/_agent_framework/_utils.py +++ b/src/microsoft/opentelemetry/_agent_framework/_utils.py @@ -5,54 +5,9 @@ from __future__ import annotations -import json - - -def extract_content_as_string_list( # pylint: disable=too-many-nested-blocks - messages_json: str, role_filter: str | None = None -) -> str: - """Extract content values from messages JSON and return as JSON string list. - - Handles Agent Framework message format with ``"parts"`` arrays. - Only extracts text content, ignoring tool_call and tool_call_response parts. - - Args: - messages_json: JSON string of messages. - role_filter: If provided, only extract content from messages with this role. - - Returns: - JSON string containing only the text content values as an array, - or the original string if parsing fails. - """ - try: - messages = json.loads(messages_json) - if isinstance(messages, list): - contents = [] - for msg in messages: - if isinstance(msg, dict): - role = msg.get("role", "") - - if role_filter and role != role_filter: - continue - - parts = msg.get("parts") - if parts and isinstance(parts, list): - for part in parts: - if isinstance(part, dict): - part_type = part.get("type", "") - if part_type == "text" and "content" in part: - contents.append(part["content"]) - return json.dumps(contents) - return messages_json - except (json.JSONDecodeError, TypeError): - return messages_json - - -def extract_input_content(messages_json: str) -> str: - """Extract text content from user messages only.""" - return extract_content_as_string_list(messages_json, role_filter="user") - - -def extract_output_content(messages_json: str) -> str: - """Extract only assistant text content from output messages.""" - return extract_content_as_string_list(messages_json, role_filter="assistant") +# Re-export from shared enricher utilities for backward compatibility. +from microsoft.opentelemetry.a365.core.enricher_utils import ( # noqa: F401 + extract_content_as_string_list, + extract_input_content, + extract_output_content, +) diff --git a/src/microsoft/opentelemetry/_genai/_langchain/_tracer.py b/src/microsoft/opentelemetry/_genai/_langchain/_tracer.py index bcfa0baa..c3650480 100644 --- a/src/microsoft/opentelemetry/_genai/_langchain/_tracer.py +++ b/src/microsoft/opentelemetry/_genai/_langchain/_tracer.py @@ -6,6 +6,7 @@ import logging import re from collections import OrderedDict +from dataclasses import asdict from collections.abc import Iterator, Mapping from itertools import chain from threading import RLock @@ -31,7 +32,7 @@ _apply_llm_finish_attributes, _maybe_emit_llm_event, ) -from opentelemetry.util.genai.types import Error, LLMInvocation +from opentelemetry.util.genai.types import Error, LLMInvocation, OutputMessage as OTelOutputMessage, Text as OTelText from opentelemetry.util.types import AttributeValue from microsoft.opentelemetry._genai._langchain._utils import ( @@ -64,12 +65,14 @@ function_calls, input_messages, invocation_parameters, - invoke_agent_input_message, - invoke_agent_output_message, metadata, model_name, output_messages, prompts, + _extract_structured_input_messages, + _extract_structured_output_messages, + _extract_agent_input_messages, + _extract_agent_output_messages, _should_capture_content_on_spans, token_counts, tools, @@ -449,22 +452,15 @@ def _aggregate_into_parent(self, run: Run) -> None: # pylint: disable=too-many- # Capture input messages from LLM runs (first LLM child wins) if run_type in ("llm", "chat_model") and not content["input_messages"]: if run.inputs: - for _, val in input_messages(run.inputs): - content["input_messages"].append(val) - break - if not content["input_messages"]: - for _, val in prompts(run.inputs): - if isinstance(val, list) and val: - content["input_messages"].append(str(val[0])) - elif isinstance(val, str): - content["input_messages"].append(val) - break + structured = _extract_structured_input_messages(run.inputs) + if structured: + content["input_messages"] = structured # Capture output messages from LLM runs (last LLM child wins) if run_type in ("llm", "chat_model") and run.outputs: - for _, val in output_messages(run.outputs): - content["output_messages"] = [val] # overwrite with latest - break + out_structured = _extract_structured_output_messages(run.outputs) + if out_structured: + content["output_messages"] = out_structured # type: ignore[assignment] # Capture tool results if run_type == "tool" and run.outputs and hasattr(run.outputs, "get"): @@ -472,7 +468,9 @@ def _aggregate_into_parent(self, run: Run) -> None: # pylint: disable=too-many- output = run.outputs.get("output", _sentinel) if output is not _sentinel: result_str = output if isinstance(output, str) else safe_json_dumps(output) - content["output_messages"].append(result_str) + content["output_messages"].append( + OTelOutputMessage(role="tool", parts=[OTelText(content=result_str)], finish_reason="stop") + ) def _find_agent_ancestor(self, run: Run) -> UUID | None: """Walk up the run tree to find the nearest agent ancestor run_id.""" @@ -513,18 +511,30 @@ def _finalize_agent_span(self, span: Span, run: Run) -> None: # Set aggregated input/output messages only when content capture is enabled if _should_capture_content_on_spans(): if msgs := content.get("input_messages"): - span.set_attribute(GEN_AI_INPUT_MESSAGES_KEY, safe_json_dumps(msgs)) + span.set_attribute( + GEN_AI_INPUT_MESSAGES_KEY, + safe_json_dumps([asdict(m) for m in msgs]), + ) else: - for _, val in invoke_agent_input_message(run.inputs): - span.set_attribute(GEN_AI_INPUT_MESSAGES_KEY, val) - break + agent_msgs = _extract_agent_input_messages(run.inputs) + if agent_msgs: + span.set_attribute( + GEN_AI_INPUT_MESSAGES_KEY, + safe_json_dumps([asdict(m) for m in agent_msgs]), + ) - if msgs := content.get("output_messages"): - span.set_attribute(GEN_AI_OUTPUT_MESSAGES_KEY, safe_json_dumps(msgs)) + if out_msgs := content.get("output_messages"): + span.set_attribute( + GEN_AI_OUTPUT_MESSAGES_KEY, + safe_json_dumps([asdict(m) for m in out_msgs]), + ) else: - for _, val in invoke_agent_output_message(run.outputs): - span.set_attribute(GEN_AI_OUTPUT_MESSAGES_KEY, val) - break + agent_out_msgs = _extract_agent_output_messages(run.outputs) + if agent_out_msgs: + span.set_attribute( + GEN_AI_OUTPUT_MESSAGES_KEY, + safe_json_dumps([asdict(m) for m in agent_out_msgs]), + ) # Set metadata (session_id, etc.) span.set_attributes(dict(flatten(metadata(run)))) diff --git a/src/microsoft/opentelemetry/_genai/_langchain/_tracer_instrumentor.py b/src/microsoft/opentelemetry/_genai/_langchain/_tracer_instrumentor.py index 60d9d82b..7775ad0d 100644 --- a/src/microsoft/opentelemetry/_genai/_langchain/_tracer_instrumentor.py +++ b/src/microsoft/opentelemetry/_genai/_langchain/_tracer_instrumentor.py @@ -51,6 +51,7 @@ def __init__(self) -> None: super().__init__() self._tracer: LangChainTracer | None = None self._original_cb_init: Callable[..., None] | None = None + self._owns_enricher: bool = False # ---- BaseInstrumentor API ------------------------------------------------- @@ -98,9 +99,32 @@ def _instrument(self, **kwargs: Any) -> None: wrapper=_BaseCallbackManagerInit(self._tracer), ) + # Register the A365 span enricher when the A365 pipeline is available. + try: + from microsoft.opentelemetry.a365.core.exporters.enriching_span_processor import register_span_enricher + + from microsoft.opentelemetry.a365.langchain._span_enricher import enrich_langchain_span + + register_span_enricher(enrich_langchain_span) + self._owns_enricher = True + except ImportError: + logger.debug("A365 enricher modules not available. Skipping enricher registration.") + except RuntimeError: + logger.debug("A span enricher is already registered. Skipping LangChain enricher registration.") + def _uninstrument(self, **kwargs: Any) -> None: if not langchain_available: return + if self._owns_enricher: + try: + from microsoft.opentelemetry.a365.core.exporters.enriching_span_processor import ( + unregister_span_enricher, + ) + + unregister_span_enricher() + except Exception: # pylint: disable=broad-exception-caught + logger.debug("Failed to unregister LangChain span enricher", exc_info=True) + self._owns_enricher = False if self._original_cb_init is not None: langchain_core.callbacks.BaseCallbackManager.__init__ = self._original_cb_init # type: ignore[assignment] self._original_cb_init = None diff --git a/src/microsoft/opentelemetry/_genai/_langchain/_utils.py b/src/microsoft/opentelemetry/_genai/_langchain/_utils.py index 3c788eed..f24896f4 100644 --- a/src/microsoft/opentelemetry/_genai/_langchain/_utils.py +++ b/src/microsoft/opentelemetry/_genai/_langchain/_utils.py @@ -1015,6 +1015,66 @@ def _extract_structured_output_messages( return results +def _extract_agent_input_messages( + inputs: Mapping[str, Any] | None, +) -> list[InputMessage]: + """Convert agent-level input messages to OTel ``InputMessage`` list. + + Agent runs store messages as a flat list under the ``messages`` key, + unlike LLM runs which nest them as list-of-lists. + """ + if not inputs or not isinstance(inputs, Mapping): + return [] + messages = inputs.get("messages") + if not messages or not isinstance(messages, list): + return [] + # Handle potential nested lists + if len(messages) > 0 and isinstance(messages[0], list): + messages = messages[0] + results: list[InputMessage] = [] + for msg in messages: + role = _langchain_role(msg) + parts: list[Any] = [] + content = _langchain_content(msg) + if content: + parts.append(Text(content=content)) + parts.extend(_langchain_tool_calls(msg)) + if parts: + results.append(InputMessage(role=role, parts=parts)) + return results + + +def _extract_agent_output_messages( + outputs: Mapping[str, Any] | None, +) -> list[OutputMessage]: + """Convert agent-level output messages to OTel ``OutputMessage`` list. + + Agent runs store output as a flat messages list. Extracts the last + assistant/AI message as the agent output. + """ + if not outputs or not isinstance(outputs, Mapping): + return [] + messages = outputs.get("messages") + if not messages or not isinstance(messages, list): + return [] + # Handle potential nested lists + if len(messages) > 0 and isinstance(messages[0], list): + messages = messages[0] + results: list[OutputMessage] = [] + for msg in reversed(messages): + role = _langchain_role(msg) + if role and role.lower() in ("ai", "assistant"): + parts: list[Any] = [] + content = _langchain_content(msg) + if content and isinstance(content, str) and content.strip(): + parts.append(Text(content=content)) + parts.extend(_langchain_tool_calls(msg)) + if parts: + results.append(OutputMessage(role=role, parts=parts, finish_reason="stop")) + break + return results + + @stop_on_exception def invoke_agent_input_message( inputs: Mapping[str, Any] | None, diff --git a/src/microsoft/opentelemetry/a365/core/enricher_utils.py b/src/microsoft/opentelemetry/a365/core/enricher_utils.py new file mode 100644 index 00000000..9bae449d --- /dev/null +++ b/src/microsoft/opentelemetry/a365/core/enricher_utils.py @@ -0,0 +1,63 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +"""Shared utilities for A365 span enrichers. + +Provides content extraction helpers used by framework-specific enrichers +(Agent Framework, Semantic Kernel, LangChain) to convert structured OTel +messages to plain content arrays before A365 export. +""" + +from __future__ import annotations + +import json + + +def extract_content_as_string_list( + messages_json: str, role_filter: str | None = None +) -> str: + """Extract content values from messages JSON and return as JSON string list. + + Handles the OTel structured message format with ``"parts"`` arrays. + Only extracts text content, ignoring tool_call and tool_call_response parts. + + Args: + messages_json: JSON string of messages. + role_filter: If provided, only extract content from messages with this role. + + Returns: + JSON string containing only the text content values as an array, + or the original string if parsing fails. + """ + try: + messages = json.loads(messages_json) + if isinstance(messages, list): + contents = [] + for msg in messages: + if isinstance(msg, dict): + role = msg.get("role", "") + + if role_filter and role != role_filter: + continue + + parts = msg.get("parts") + if parts and isinstance(parts, list): + for part in parts: + if isinstance(part, dict): + part_type = part.get("type", "") + if part_type == "text" and "content" in part: + contents.append(part["content"]) + return json.dumps(contents) + return messages_json + except (json.JSONDecodeError, TypeError): + return messages_json + + +def extract_input_content(messages_json: str) -> str: + """Extract text content from user messages only.""" + return extract_content_as_string_list(messages_json, role_filter="user") + + +def extract_output_content(messages_json: str) -> str: + """Extract only assistant text content from output messages.""" + return extract_content_as_string_list(messages_json, role_filter="assistant") diff --git a/src/microsoft/opentelemetry/a365/langchain/__init__.py b/src/microsoft/opentelemetry/a365/langchain/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/microsoft/opentelemetry/a365/langchain/_span_enricher.py b/src/microsoft/opentelemetry/a365/langchain/_span_enricher.py new file mode 100644 index 00000000..5908459e --- /dev/null +++ b/src/microsoft/opentelemetry/a365/langchain/_span_enricher.py @@ -0,0 +1,47 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +"""Span enricher for LangChain. + +Converts OTel-spec structured messages on invoke_agent spans back to +plain-string content arrays before export through the A365 pipeline, +matching the format the A365 backend expects. +""" + +from __future__ import annotations + +from microsoft.opentelemetry.a365.core.enricher_utils import extract_input_content, extract_output_content +from microsoft.opentelemetry.a365.core.constants import ( + GEN_AI_INPUT_MESSAGES_KEY, + GEN_AI_OUTPUT_MESSAGES_KEY, + INVOKE_AGENT_OPERATION_NAME, +) +from microsoft.opentelemetry.a365.core.exporters.enriched_span import EnrichedReadableSpan +from opentelemetry.sdk.trace import ReadableSpan + + +def enrich_langchain_span(span: ReadableSpan) -> ReadableSpan: + """Enricher for LangChain spans exported through the A365 pipeline. + + For invoke_agent spans, converts OTel-spec structured messages + (``[{"role":"user","parts":[...]}]``) to plain content arrays + (``["Hello"]``) that the A365 backend expects. + """ + if not span.name.startswith(INVOKE_AGENT_OPERATION_NAME): + return span + + attributes = span.attributes or {} + extra_attributes: dict[str, str] = {} + + input_messages = attributes.get(GEN_AI_INPUT_MESSAGES_KEY) + if input_messages: + extra_attributes[GEN_AI_INPUT_MESSAGES_KEY] = extract_input_content(str(input_messages)) + + output_messages = attributes.get(GEN_AI_OUTPUT_MESSAGES_KEY) + if output_messages: + extra_attributes[GEN_AI_OUTPUT_MESSAGES_KEY] = extract_output_content(str(output_messages)) + + if extra_attributes: + return EnrichedReadableSpan(span, extra_attributes) + + return span diff --git a/tests/langchain/test_span_enricher.py b/tests/langchain/test_span_enricher.py new file mode 100644 index 00000000..9d5a83e8 --- /dev/null +++ b/tests/langchain/test_span_enricher.py @@ -0,0 +1,130 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +"""Tests for LangChain span enricher.""" + +import json +import unittest +from unittest.mock import Mock + +from microsoft.opentelemetry.a365.core.constants import ( + GEN_AI_INPUT_MESSAGES_KEY, + GEN_AI_OUTPUT_MESSAGES_KEY, +) +from microsoft.opentelemetry.a365.core.enricher_utils import ( + extract_content_as_string_list, +) +from microsoft.opentelemetry.a365.langchain._span_enricher import ( + enrich_langchain_span, +) + + +class TestExtractContentAsStringList(unittest.TestCase): + """Tests for the shared extract_content_as_string_list helper.""" + + def test_structured_messages_to_plain_strings(self): + messages = json.dumps([ + {"role": "user", "parts": [{"type": "text", "content": "Hello"}]}, + ]) + result = json.loads(extract_content_as_string_list(messages)) + self.assertEqual(result, ["Hello"]) + + def test_filters_by_role(self): + messages = json.dumps([ + {"role": "system", "parts": [{"type": "text", "content": "You are helpful"}]}, + {"role": "user", "parts": [{"type": "text", "content": "Hello"}]}, + ]) + result = json.loads(extract_content_as_string_list(messages, role_filter="user")) + self.assertEqual(result, ["Hello"]) + + def test_no_role_filter_extracts_all(self): + messages = json.dumps([ + {"role": "system", "parts": [{"type": "text", "content": "You are helpful"}]}, + {"role": "user", "parts": [{"type": "text", "content": "Hello"}]}, + ]) + result = json.loads(extract_content_as_string_list(messages)) + self.assertEqual(result, ["You are helpful", "Hello"]) + + def test_skips_non_text_parts(self): + messages = json.dumps([ + {"role": "assistant", "parts": [ + {"type": "tool_call", "id": "c1"}, + {"type": "text", "content": "Result is 3"}, + ]}, + ]) + result = json.loads(extract_content_as_string_list(messages, role_filter="assistant")) + self.assertEqual(result, ["Result is 3"]) + + def test_invalid_json_returns_original(self): + self.assertEqual(extract_content_as_string_list("not json"), "not json") + + def test_non_list_returns_original(self): + self.assertEqual(extract_content_as_string_list('"just a string"'), '"just a string"') + + +class TestEnrichLangchainSpan(unittest.TestCase): + """Tests for enrich_langchain_span.""" + + def test_invoke_agent_converts_structured_to_plain(self): + span = Mock() + span.name = "invoke_agent my_agent" + span.attributes = { + GEN_AI_INPUT_MESSAGES_KEY: json.dumps([ + {"role": "user", "parts": [{"type": "text", "content": "What is 2+2?"}]}, + ]), + GEN_AI_OUTPUT_MESSAGES_KEY: json.dumps([ + {"role": "assistant", "parts": [{"type": "text", "content": "4"}], "finish_reason": "stop"}, + ]), + } + + result = enrich_langchain_span(span) + + self.assertEqual(result.attributes[GEN_AI_INPUT_MESSAGES_KEY], '["What is 2+2?"]') + self.assertEqual(result.attributes[GEN_AI_OUTPUT_MESSAGES_KEY], '["4"]') + + def test_invoke_agent_filters_roles(self): + """Input filters for user, output filters for assistant.""" + span = Mock() + span.name = "invoke_agent test" + span.attributes = { + GEN_AI_INPUT_MESSAGES_KEY: json.dumps([ + {"role": "system", "parts": [{"type": "text", "content": "System prompt"}]}, + {"role": "user", "parts": [{"type": "text", "content": "Hello"}]}, + ]), + GEN_AI_OUTPUT_MESSAGES_KEY: json.dumps([ + {"role": "tool", "parts": [{"type": "text", "content": "tool result"}]}, + {"role": "assistant", "parts": [{"type": "text", "content": "Answer"}]}, + ]), + } + + result = enrich_langchain_span(span) + + self.assertEqual(result.attributes[GEN_AI_INPUT_MESSAGES_KEY], '["Hello"]') + self.assertEqual(result.attributes[GEN_AI_OUTPUT_MESSAGES_KEY], '["Answer"]') + + def test_non_invoke_agent_span_unchanged(self): + span = Mock() + span.name = "chat gpt-4" + span.attributes = { + GEN_AI_INPUT_MESSAGES_KEY: "some value", + } + + self.assertIs(enrich_langchain_span(span), span) + + def test_no_attributes_returns_original(self): + span = Mock() + span.name = "invoke_agent test" + span.attributes = None + + self.assertIs(enrich_langchain_span(span), span) + + def test_empty_attributes_returns_original(self): + span = Mock() + span.name = "invoke_agent test" + span.attributes = {} + + self.assertIs(enrich_langchain_span(span), span) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/langchain/test_tracer.py b/tests/langchain/test_tracer.py index a85badbd..dfa169b8 100644 --- a/tests/langchain/test_tracer.py +++ b/tests/langchain/test_tracer.py @@ -447,7 +447,10 @@ def test_aggregates_tool_output(self, mock_ctx): tracer._aggregate_into_parent(tool_run) content = tracer._agent_content[agent_run.id] - self.assertIn("42", content["output_messages"]) + self.assertEqual(len(content["output_messages"]), 1) + tool_msg = content["output_messages"][0] + self.assertEqual(tool_msg.role, "tool") + self.assertEqual(tool_msg.parts[0].content, "42") @patch("microsoft.opentelemetry._genai._langchain._tracer.context_api") def test_aggregates_tokens_from_generation_info_usage(self, mock_ctx): diff --git a/tests/langchain/test_utils.py b/tests/langchain/test_utils.py index 88f0b1bf..13dfb0f5 100644 --- a/tests/langchain/test_utils.py +++ b/tests/langchain/test_utils.py @@ -58,6 +58,8 @@ safe_json_dumps, stop_on_exception, token_counts, + _extract_agent_input_messages, + _extract_agent_output_messages, tools, ) @@ -642,6 +644,79 @@ def test_extracts_last_ai_message(self): self.assertEqual(result, [(GEN_AI_OUTPUT_MESSAGES_KEY, "Second")]) +# ---- Agent structured message extractors ------------------------------------ + + +class TestExtractAgentInputMessages(TestCase): + def test_extracts_structured_human_message(self): + inputs = {"messages": [{"role": "human", "content": "What is 2+2?"}]} + result = _extract_agent_input_messages(inputs) + self.assertEqual(len(result), 1) + self.assertEqual(result[0].role, "human") + self.assertEqual(len(result[0].parts), 1) + self.assertEqual(result[0].parts[0].content, "What is 2+2?") + + def test_extracts_multiple_messages(self): + inputs = { + "messages": [ + {"role": "system", "content": "You are helpful"}, + {"role": "human", "content": "Hello"}, + ] + } + result = _extract_agent_input_messages(inputs) + self.assertEqual(len(result), 2) + self.assertEqual(result[0].role, "system") + self.assertEqual(result[1].role, "human") + + def test_extracts_from_nested_list(self): + inputs = {"messages": [[{"role": "human", "content": "Hello"}]]} + result = _extract_agent_input_messages(inputs) + self.assertEqual(len(result), 1) + self.assertEqual(result[0].parts[0].content, "Hello") + + def test_returns_empty_on_none(self): + self.assertEqual(_extract_agent_input_messages(None), []) + + def test_returns_empty_on_no_messages(self): + self.assertEqual(_extract_agent_input_messages({"other": "data"}), []) + + +class TestExtractAgentOutputMessages(TestCase): + def test_extracts_structured_ai_message(self): + outputs = {"messages": [{"role": "ai", "content": "The answer is 4"}]} + result = _extract_agent_output_messages(outputs) + self.assertEqual(len(result), 1) + self.assertEqual(result[0].role, "ai") + self.assertEqual(result[0].parts[0].content, "The answer is 4") + self.assertEqual(result[0].finish_reason, "stop") + + def test_returns_empty_on_none(self): + self.assertEqual(_extract_agent_output_messages(None), []) + + def test_extracts_last_ai_message(self): + outputs = { + "messages": [ + {"role": "ai", "content": "First"}, + {"role": "human", "content": "Again"}, + {"role": "ai", "content": "Second"}, + ] + } + result = _extract_agent_output_messages(outputs) + self.assertEqual(len(result), 1) + self.assertEqual(result[0].parts[0].content, "Second") + + def test_skips_empty_content(self): + outputs = {"messages": [{"role": "ai", "content": ""}]} + result = _extract_agent_output_messages(outputs) + self.assertEqual(result, []) + + def test_extracts_from_nested_list(self): + outputs = {"messages": [[{"role": "ai", "content": "Nested answer"}]]} + result = _extract_agent_output_messages(outputs) + self.assertEqual(len(result), 1) + self.assertEqual(result[0].parts[0].content, "Nested answer") + + # ---- Agent metadata extractors -----------------------------------------------