Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 6 additions & 51 deletions src/microsoft/opentelemetry/_agent_framework/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,54 +5,9 @@

from __future__ import annotations

import json


def extract_content_as_string_list( # pylint: disable=too-many-nested-blocks
messages_json: str, role_filter: str | None = None
) -> str:
"""Extract content values from messages JSON and return as JSON string list.

Handles Agent Framework message format with ``"parts"`` arrays.
Only extracts text content, ignoring tool_call and tool_call_response parts.

Args:
messages_json: JSON string of messages.
role_filter: If provided, only extract content from messages with this role.

Returns:
JSON string containing only the text content values as an array,
or the original string if parsing fails.
"""
try:
messages = json.loads(messages_json)
if isinstance(messages, list):
contents = []
for msg in messages:
if isinstance(msg, dict):
role = msg.get("role", "")

if role_filter and role != role_filter:
continue

parts = msg.get("parts")
if parts and isinstance(parts, list):
for part in parts:
if isinstance(part, dict):
part_type = part.get("type", "")
if part_type == "text" and "content" in part:
contents.append(part["content"])
return json.dumps(contents)
return messages_json
except (json.JSONDecodeError, TypeError):
return messages_json


def extract_input_content(messages_json: str) -> str:
"""Extract text content from user messages only."""
return extract_content_as_string_list(messages_json, role_filter="user")


def extract_output_content(messages_json: str) -> str:
"""Extract only assistant text content from output messages."""
return extract_content_as_string_list(messages_json, role_filter="assistant")
# Re-export from shared enricher utilities for backward compatibility.
from microsoft.opentelemetry.a365.core.enricher_utils import ( # noqa: F401
extract_content_as_string_list,
extract_input_content,
extract_output_content,
)
62 changes: 36 additions & 26 deletions src/microsoft/opentelemetry/_genai/_langchain/_tracer.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import logging
import re
from collections import OrderedDict
from dataclasses import asdict
from collections.abc import Iterator, Mapping
from itertools import chain
from threading import RLock
Expand All @@ -31,7 +32,7 @@
_apply_llm_finish_attributes,
_maybe_emit_llm_event,
)
from opentelemetry.util.genai.types import Error, LLMInvocation
from opentelemetry.util.genai.types import Error, LLMInvocation, OutputMessage as OTelOutputMessage, Text as OTelText
from opentelemetry.util.types import AttributeValue

from microsoft.opentelemetry._genai._langchain._utils import (
Expand Down Expand Up @@ -64,12 +65,14 @@
function_calls,
input_messages,
invocation_parameters,
invoke_agent_input_message,
invoke_agent_output_message,
metadata,
model_name,
output_messages,
prompts,
_extract_structured_input_messages,
_extract_structured_output_messages,
_extract_agent_input_messages,
_extract_agent_output_messages,
_should_capture_content_on_spans,
token_counts,
tools,
Expand Down Expand Up @@ -449,30 +452,25 @@ def _aggregate_into_parent(self, run: Run) -> None: # pylint: disable=too-many-
# Capture input messages from LLM runs (first LLM child wins)
if run_type in ("llm", "chat_model") and not content["input_messages"]:
if run.inputs:
for _, val in input_messages(run.inputs):
content["input_messages"].append(val)
break
if not content["input_messages"]:
for _, val in prompts(run.inputs):
if isinstance(val, list) and val:
content["input_messages"].append(str(val[0]))
elif isinstance(val, str):
content["input_messages"].append(val)
break
structured = _extract_structured_input_messages(run.inputs)
if structured:
content["input_messages"] = structured

# Capture output messages from LLM runs (last LLM child wins)
if run_type in ("llm", "chat_model") and run.outputs:
for _, val in output_messages(run.outputs):
content["output_messages"] = [val] # overwrite with latest
break
out_structured = _extract_structured_output_messages(run.outputs)
if out_structured:
content["output_messages"] = out_structured # type: ignore[assignment]

# Capture tool results
if run_type == "tool" and run.outputs and hasattr(run.outputs, "get"):
_sentinel = object()
output = run.outputs.get("output", _sentinel)
if output is not _sentinel:
result_str = output if isinstance(output, str) else safe_json_dumps(output)
content["output_messages"].append(result_str)
content["output_messages"].append(
OTelOutputMessage(role="tool", parts=[OTelText(content=result_str)], finish_reason="stop")
)

def _find_agent_ancestor(self, run: Run) -> UUID | None:
"""Walk up the run tree to find the nearest agent ancestor run_id."""
Expand Down Expand Up @@ -513,18 +511,30 @@ def _finalize_agent_span(self, span: Span, run: Run) -> None:
# Set aggregated input/output messages only when content capture is enabled
if _should_capture_content_on_spans():
if msgs := content.get("input_messages"):
span.set_attribute(GEN_AI_INPUT_MESSAGES_KEY, safe_json_dumps(msgs))
span.set_attribute(
GEN_AI_INPUT_MESSAGES_KEY,
safe_json_dumps([asdict(m) for m in msgs]),
)
else:
for _, val in invoke_agent_input_message(run.inputs):
span.set_attribute(GEN_AI_INPUT_MESSAGES_KEY, val)
break
agent_msgs = _extract_agent_input_messages(run.inputs)
if agent_msgs:
span.set_attribute(
GEN_AI_INPUT_MESSAGES_KEY,
safe_json_dumps([asdict(m) for m in agent_msgs]),
)

if msgs := content.get("output_messages"):
span.set_attribute(GEN_AI_OUTPUT_MESSAGES_KEY, safe_json_dumps(msgs))
if out_msgs := content.get("output_messages"):
span.set_attribute(
GEN_AI_OUTPUT_MESSAGES_KEY,
safe_json_dumps([asdict(m) for m in out_msgs]),
)
else:
for _, val in invoke_agent_output_message(run.outputs):
span.set_attribute(GEN_AI_OUTPUT_MESSAGES_KEY, val)
break
agent_out_msgs = _extract_agent_output_messages(run.outputs)
if agent_out_msgs:
span.set_attribute(
GEN_AI_OUTPUT_MESSAGES_KEY,
safe_json_dumps([asdict(m) for m in agent_out_msgs]),
)
Comment on lines 511 to +537

# Set metadata (session_id, etc.)
span.set_attributes(dict(flatten(metadata(run))))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ def __init__(self) -> None:
super().__init__()
self._tracer: LangChainTracer | None = None
self._original_cb_init: Callable[..., None] | None = None
self._owns_enricher: bool = False

# ---- BaseInstrumentor API -------------------------------------------------

Expand Down Expand Up @@ -98,9 +99,32 @@ def _instrument(self, **kwargs: Any) -> None:
wrapper=_BaseCallbackManagerInit(self._tracer),
)

# Register the A365 span enricher when the A365 pipeline is available.
try:
from microsoft.opentelemetry.a365.core.exporters.enriching_span_processor import register_span_enricher

from microsoft.opentelemetry.a365.langchain._span_enricher import enrich_langchain_span

register_span_enricher(enrich_langchain_span)
self._owns_enricher = True
except ImportError:
logger.debug("A365 enricher modules not available. Skipping enricher registration.")
except RuntimeError:
logger.debug("A span enricher is already registered. Skipping LangChain enricher registration.")

def _uninstrument(self, **kwargs: Any) -> None:
if not langchain_available:
return
if self._owns_enricher:
try:
from microsoft.opentelemetry.a365.core.exporters.enriching_span_processor import (
unregister_span_enricher,
)

unregister_span_enricher()
except Exception: # pylint: disable=broad-exception-caught
logger.debug("Failed to unregister LangChain span enricher", exc_info=True)
self._owns_enricher = False
if self._original_cb_init is not None:
langchain_core.callbacks.BaseCallbackManager.__init__ = self._original_cb_init # type: ignore[assignment]
self._original_cb_init = None
Expand Down
60 changes: 60 additions & 0 deletions src/microsoft/opentelemetry/_genai/_langchain/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1015,6 +1015,66 @@ def _extract_structured_output_messages(
return results


def _extract_agent_input_messages(
inputs: Mapping[str, Any] | None,
) -> list[InputMessage]:
"""Convert agent-level input messages to OTel ``InputMessage`` list.

Agent runs store messages as a flat list under the ``messages`` key,
unlike LLM runs which nest them as list-of-lists.
"""
if not inputs or not isinstance(inputs, Mapping):
return []
messages = inputs.get("messages")
if not messages or not isinstance(messages, list):
return []
# Handle potential nested lists
if len(messages) > 0 and isinstance(messages[0], list):
messages = messages[0]
results: list[InputMessage] = []
for msg in messages:
role = _langchain_role(msg)
parts: list[Any] = []
content = _langchain_content(msg)
if content:
parts.append(Text(content=content))
parts.extend(_langchain_tool_calls(msg))
if parts:
results.append(InputMessage(role=role, parts=parts))
return results
Comment on lines +1018 to +1044


def _extract_agent_output_messages(
outputs: Mapping[str, Any] | None,
) -> list[OutputMessage]:
"""Convert agent-level output messages to OTel ``OutputMessage`` list.

Agent runs store output as a flat messages list. Extracts the last
assistant/AI message as the agent output.
"""
if not outputs or not isinstance(outputs, Mapping):
return []
messages = outputs.get("messages")
if not messages or not isinstance(messages, list):
return []
# Handle potential nested lists
if len(messages) > 0 and isinstance(messages[0], list):
messages = messages[0]
results: list[OutputMessage] = []
for msg in reversed(messages):
role = _langchain_role(msg)
if role and role.lower() in ("ai", "assistant"):
parts: list[Any] = []
content = _langchain_content(msg)
if content and isinstance(content, str) and content.strip():
parts.append(Text(content=content))
parts.extend(_langchain_tool_calls(msg))
if parts:
results.append(OutputMessage(role=role, parts=parts, finish_reason="stop"))
break
return results
Comment on lines +1047 to +1075


@stop_on_exception
def invoke_agent_input_message(
inputs: Mapping[str, Any] | None,
Expand Down
63 changes: 63 additions & 0 deletions src/microsoft/opentelemetry/a365/core/enricher_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.

"""Shared utilities for A365 span enrichers.

Provides content extraction helpers used by framework-specific enrichers
(Agent Framework, Semantic Kernel, LangChain) to convert structured OTel
messages to plain content arrays before A365 export.
"""

from __future__ import annotations

import json


def extract_content_as_string_list(
messages_json: str, role_filter: str | None = None
) -> str:
"""Extract content values from messages JSON and return as JSON string list.

Handles the OTel structured message format with ``"parts"`` arrays.
Only extracts text content, ignoring tool_call and tool_call_response parts.

Args:
messages_json: JSON string of messages.
role_filter: If provided, only extract content from messages with this role.

Returns:
JSON string containing only the text content values as an array,
or the original string if parsing fails.
"""
try:
messages = json.loads(messages_json)
if isinstance(messages, list):
contents = []
for msg in messages:
if isinstance(msg, dict):
role = msg.get("role", "")

if role_filter and role != role_filter:
continue

parts = msg.get("parts")
if parts and isinstance(parts, list):
for part in parts:
if isinstance(part, dict):
part_type = part.get("type", "")
if part_type == "text" and "content" in part:
contents.append(part["content"])
return json.dumps(contents)
return messages_json
except (json.JSONDecodeError, TypeError):
return messages_json
Comment on lines +32 to +53


def extract_input_content(messages_json: str) -> str:
"""Extract text content from user messages only."""
return extract_content_as_string_list(messages_json, role_filter="user")


def extract_output_content(messages_json: str) -> str:
"""Extract only assistant text content from output messages."""
return extract_content_as_string_list(messages_json, role_filter="assistant")
Empty file.
47 changes: 47 additions & 0 deletions src/microsoft/opentelemetry/a365/langchain/_span_enricher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.

"""Span enricher for LangChain.

Converts OTel-spec structured messages on invoke_agent spans back to
plain-string content arrays before export through the A365 pipeline,
matching the format the A365 backend expects.
"""

from __future__ import annotations

from microsoft.opentelemetry.a365.core.enricher_utils import extract_input_content, extract_output_content
from microsoft.opentelemetry.a365.core.constants import (
GEN_AI_INPUT_MESSAGES_KEY,
GEN_AI_OUTPUT_MESSAGES_KEY,
INVOKE_AGENT_OPERATION_NAME,
)
from microsoft.opentelemetry.a365.core.exporters.enriched_span import EnrichedReadableSpan
from opentelemetry.sdk.trace import ReadableSpan


def enrich_langchain_span(span: ReadableSpan) -> ReadableSpan:
"""Enricher for LangChain spans exported through the A365 pipeline.

For invoke_agent spans, converts OTel-spec structured messages
(``[{"role":"user","parts":[...]}]``) to plain content arrays
(``["Hello"]``) that the A365 backend expects.
"""
if not span.name.startswith(INVOKE_AGENT_OPERATION_NAME):
return span

attributes = span.attributes or {}
extra_attributes: dict[str, str] = {}

input_messages = attributes.get(GEN_AI_INPUT_MESSAGES_KEY)
if input_messages:
extra_attributes[GEN_AI_INPUT_MESSAGES_KEY] = extract_input_content(str(input_messages))

output_messages = attributes.get(GEN_AI_OUTPUT_MESSAGES_KEY)
if output_messages:
extra_attributes[GEN_AI_OUTPUT_MESSAGES_KEY] = extract_output_content(str(output_messages))

if extra_attributes:
return EnrichedReadableSpan(span, extra_attributes)

return span
Loading
Loading