diff --git a/src/microsoft/opentelemetry/_genai/_langchain/_span_enricher.py b/src/microsoft/opentelemetry/_genai/_langchain/_span_enricher.py new file mode 100644 index 00000000..6bf5d9f6 --- /dev/null +++ b/src/microsoft/opentelemetry/_genai/_langchain/_span_enricher.py @@ -0,0 +1,59 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +"""Span enricher for LangChain. + +Maps standard OTel GenAI attributes to A365-specific keys before export. +Registered by LangChainInstrumentor when the A365 pipeline is available. +""" + +from microsoft.opentelemetry.a365.core.constants import ( + EXECUTE_TOOL_OPERATION_NAME, + GEN_AI_CONVERSATION_ID_KEY, + GEN_AI_INPUT_MESSAGES_KEY, + GEN_AI_OUTPUT_MESSAGES_KEY, + GEN_AI_TOOL_ARGS_KEY, + GEN_AI_TOOL_CALL_RESULT_KEY, + INVOKE_AGENT_OPERATION_NAME, + SESSION_ID_KEY, +) +from microsoft.opentelemetry.a365.core.exporters.enriched_span import EnrichedReadableSpan +from opentelemetry.sdk.trace import ReadableSpan + + +def enrich_langchain_span(span: ReadableSpan) -> ReadableSpan: + """Enricher function for LangChain spans. + + Transforms standard OTel GenAI attributes to A365-specific keys: + - ``gen_ai.conversation.id`` → ``microsoft.session.id`` + - For invoke_agent spans: extracts content from input/output messages + - For execute_tool spans: maps tool arguments and results + """ + extra_attributes = {} + attributes = span.attributes or {} + + # Map gen_ai.conversation.id → microsoft.session.id for A365 consumers + conversation_id = attributes.get(GEN_AI_CONVERSATION_ID_KEY) + if conversation_id and SESSION_ID_KEY not in attributes: + extra_attributes[SESSION_ID_KEY] = str(conversation_id) + + if span.name.startswith(INVOKE_AGENT_OPERATION_NAME): + input_messages = attributes.get(GEN_AI_INPUT_MESSAGES_KEY) + if input_messages: + extra_attributes[GEN_AI_INPUT_MESSAGES_KEY] = str(input_messages) + + output_messages = attributes.get(GEN_AI_OUTPUT_MESSAGES_KEY) + if output_messages: + extra_attributes[GEN_AI_OUTPUT_MESSAGES_KEY] = str(output_messages) + + elif span.name.startswith(EXECUTE_TOOL_OPERATION_NAME): + if GEN_AI_TOOL_ARGS_KEY in attributes: + extra_attributes[GEN_AI_TOOL_ARGS_KEY] = str(attributes[GEN_AI_TOOL_ARGS_KEY]) + + if GEN_AI_TOOL_CALL_RESULT_KEY in attributes: + extra_attributes[GEN_AI_TOOL_CALL_RESULT_KEY] = str(attributes[GEN_AI_TOOL_CALL_RESULT_KEY]) + + if extra_attributes: + return EnrichedReadableSpan(span, extra_attributes) + + return span diff --git a/src/microsoft/opentelemetry/_genai/_langchain/_tracer_instrumentor.py b/src/microsoft/opentelemetry/_genai/_langchain/_tracer_instrumentor.py index 60d9d82b..3bb2b2fc 100644 --- a/src/microsoft/opentelemetry/_genai/_langchain/_tracer_instrumentor.py +++ b/src/microsoft/opentelemetry/_genai/_langchain/_tracer_instrumentor.py @@ -21,6 +21,12 @@ from opentelemetry.instrumentation.instrumentor import BaseInstrumentor # type: ignore[attr-defined] from opentelemetry.trace import Span +from microsoft.opentelemetry.a365.core.exporters.enriching_span_processor import ( + register_span_enricher, + unregister_span_enricher, +) +from microsoft.opentelemetry._genai._langchain._span_enricher import enrich_langchain_span + logger = logging.getLogger(__name__) _INSTRUMENTS: str = "langchain-core >= 0.2.0" @@ -51,6 +57,7 @@ def __init__(self) -> None: super().__init__() self._tracer: LangChainTracer | None = None self._original_cb_init: Callable[..., None] | None = None + self._owns_enricher: bool = False # ---- BaseInstrumentor API ------------------------------------------------- @@ -59,9 +66,7 @@ def instrumentation_dependencies(self) -> Collection[str]: def _instrument(self, **kwargs: Any) -> None: if not langchain_available: - logger.debug( - "Skipping LangChain instrumentation: langchain-core is not available." - ) + logger.debug("Skipping LangChain instrumentation: langchain-core is not available.") return tracer_provider = kwargs.get("tracer_provider") tracer = trace_api.get_tracer( @@ -98,9 +103,19 @@ def _instrument(self, **kwargs: Any) -> None: wrapper=_BaseCallbackManagerInit(self._tracer), ) + # Register the A365 span enricher for LangChain. + try: + register_span_enricher(enrich_langchain_span) + self._owns_enricher = True + except RuntimeError: + logger.debug("A span enricher is already registered. Skipping LangChain enricher registration.") + def _uninstrument(self, **kwargs: Any) -> None: if not langchain_available: return + if self._owns_enricher: + unregister_span_enricher() + self._owns_enricher = False if self._original_cb_init is not None: langchain_core.callbacks.BaseCallbackManager.__init__ = self._original_cb_init # type: ignore[assignment] self._original_cb_init = None diff --git a/src/microsoft/opentelemetry/_genai/_langchain/_utils.py b/src/microsoft/opentelemetry/_genai/_langchain/_utils.py index 7a48c793..e96180cb 100644 --- a/src/microsoft/opentelemetry/_genai/_langchain/_utils.py +++ b/src/microsoft/opentelemetry/_genai/_langchain/_utils.py @@ -218,8 +218,6 @@ def __delitem__(self, key: K) -> None: GEN_AI_TOOL_DEFINITIONS_KEY = GEN_AI_TOOL_DEFINITIONS GEN_AI_AGENT_VERSION_KEY = GEN_AI_AGENT_VERSION -SESSION_ID_KEY = "microsoft.session.id" - # ---- Internal helpers -------------------------------------------------------- @@ -291,7 +289,7 @@ def metadata(run: Run) -> Iterator[tuple[str, str]]: if session_id := ( meta.get(LANGCHAIN_SESSION_ID) or meta.get(LANGCHAIN_CONVERSATION_ID) or meta.get(LANGCHAIN_THREAD_ID) ): - yield SESSION_ID_KEY, session_id + yield GEN_AI_CONVERSATION_ID_KEY, session_id @stop_on_exception @@ -921,7 +919,7 @@ def extract_agent_metadata(run: Run) -> Iterator[tuple[str, str]]: @stop_on_exception def extract_session_info(run: Run) -> Iterator[tuple[str, str]]: - """Extract session_id and conversation_id from run metadata.""" + """Extract conversation ID from run metadata as gen_ai.conversation.id.""" if not run.extra or not isinstance(run.extra, dict): return meta = run.extra.get("metadata") @@ -929,7 +927,5 @@ def extract_session_info(run: Run) -> Iterator[tuple[str, str]]: return for key in (LANGCHAIN_SESSION_ID, LANGCHAIN_CONVERSATION_ID, LANGCHAIN_THREAD_ID): if sid := meta.get(key): - yield SESSION_ID_KEY, sid + yield GEN_AI_CONVERSATION_ID_KEY, sid break - if conv_id := meta.get(LANGCHAIN_CONVERSATION_ID): - yield GEN_AI_CONVERSATION_ID_KEY, conv_id diff --git a/tests/langchain/test_span_enricher.py b/tests/langchain/test_span_enricher.py new file mode 100644 index 00000000..c5316a29 --- /dev/null +++ b/tests/langchain/test_span_enricher.py @@ -0,0 +1,116 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +"""Tests for LangChain span enricher.""" + +import unittest +from unittest.mock import Mock + +from microsoft.opentelemetry.a365.core.constants import ( + GEN_AI_CONVERSATION_ID_KEY, + GEN_AI_INPUT_MESSAGES_KEY, + GEN_AI_OUTPUT_MESSAGES_KEY, + GEN_AI_TOOL_ARGS_KEY, + GEN_AI_TOOL_CALL_RESULT_KEY, + SESSION_ID_KEY, +) +from microsoft.opentelemetry._genai._langchain._span_enricher import enrich_langchain_span + + +class TestLangChainSpanEnricher(unittest.TestCase): + """Test suite for enrich_langchain_span function.""" + + def test_maps_conversation_id_to_session_id(self): + """gen_ai.conversation.id is mapped to microsoft.session.id.""" + span = Mock() + span.name = "chat gpt-4o" + span.attributes = { + GEN_AI_CONVERSATION_ID_KEY: "conv-123", + } + + result = enrich_langchain_span(span) + + assert result.attributes is not None + self.assertEqual(result.attributes[SESSION_ID_KEY], "conv-123") + + def test_does_not_overwrite_existing_session_id(self): + """If microsoft.session.id is already present, don't overwrite.""" + span = Mock() + span.name = "chat gpt-4o" + span.attributes = { + GEN_AI_CONVERSATION_ID_KEY: "conv-123", + SESSION_ID_KEY: "existing-session", + } + + result = enrich_langchain_span(span) + # Should return original span unchanged + self.assertIs(result, span) + + def test_invoke_agent_span_enrichment(self): + """invoke_agent spans pass through input/output messages.""" + span = Mock() + span.name = "invoke_agent Travel_Assistant" + span.attributes = { + GEN_AI_INPUT_MESSAGES_KEY: '["Where should I go?"]', + GEN_AI_OUTPUT_MESSAGES_KEY: '["Try Barcelona!"]', + } + + result = enrich_langchain_span(span) + + assert result.attributes is not None + self.assertEqual(result.attributes[GEN_AI_INPUT_MESSAGES_KEY], '["Where should I go?"]') + self.assertEqual(result.attributes[GEN_AI_OUTPUT_MESSAGES_KEY], '["Try Barcelona!"]') + + def test_invoke_agent_with_conversation_id(self): + """invoke_agent spans get both session_id mapping and message enrichment.""" + span = Mock() + span.name = "invoke_agent MyAgent" + span.attributes = { + GEN_AI_CONVERSATION_ID_KEY: "conv-456", + GEN_AI_INPUT_MESSAGES_KEY: '["Hello"]', + } + + result = enrich_langchain_span(span) + + assert result.attributes is not None + self.assertEqual(result.attributes[SESSION_ID_KEY], "conv-456") + self.assertEqual(result.attributes[GEN_AI_INPUT_MESSAGES_KEY], '["Hello"]') + + def test_execute_tool_span_enrichment(self): + """execute_tool spans map tool arguments and results.""" + span = Mock() + span.name = "execute_tool get_weather" + span.attributes = { + GEN_AI_TOOL_ARGS_KEY: '{"city": "Barcelona"}', + GEN_AI_TOOL_CALL_RESULT_KEY: "Sunny, 25C", + } + + result = enrich_langchain_span(span) + + assert result.attributes is not None + self.assertEqual(result.attributes[GEN_AI_TOOL_ARGS_KEY], '{"city": "Barcelona"}') + self.assertEqual(result.attributes[GEN_AI_TOOL_CALL_RESULT_KEY], "Sunny, 25C") + + def test_non_matching_span_returns_original(self): + """Non-matching spans without conversation_id return unchanged.""" + span = Mock() + span.name = "other_operation" + span.attributes = {"key": "value"} + + self.assertIs(enrich_langchain_span(span), span) + + def test_none_attributes_returns_original(self): + """Spans with None attributes return unchanged.""" + span = Mock() + span.name = "invoke_agent Test" + span.attributes = None + + self.assertIs(enrich_langchain_span(span), span) + + def test_empty_attributes_returns_original(self): + """Spans with empty attributes return unchanged.""" + span = Mock() + span.name = "invoke_agent Test" + span.attributes = {} + + self.assertIs(enrich_langchain_span(span), span) diff --git a/tests/langchain/test_utils.py b/tests/langchain/test_utils.py index 95c223ac..943a2747 100644 --- a/tests/langchain/test_utils.py +++ b/tests/langchain/test_utils.py @@ -35,7 +35,6 @@ GEN_AI_TOOL_TYPE_KEY, GEN_AI_USAGE_INPUT_TOKENS_KEY, GEN_AI_USAGE_OUTPUT_TOKENS_KEY, - SESSION_ID_KEY, add_operation_type, as_utc_nano, build_llm_invocation, @@ -236,17 +235,17 @@ class TestMetadata(TestCase): def test_extracts_session_id(self): run = _make_run(extra={"metadata": {"session_id": "sess-123"}}) result = list(metadata(run)) - self.assertEqual(result, [(SESSION_ID_KEY, "sess-123")]) + self.assertEqual(result, [(GEN_AI_CONVERSATION_ID_KEY, "sess-123")]) def test_extracts_conversation_id(self): run = _make_run(extra={"metadata": {"conversation_id": "conv-456"}}) result = list(metadata(run)) - self.assertEqual(result, [(SESSION_ID_KEY, "conv-456")]) + self.assertEqual(result, [(GEN_AI_CONVERSATION_ID_KEY, "conv-456")]) def test_extracts_thread_id(self): run = _make_run(extra={"metadata": {"thread_id": "thread-789"}}) result = list(metadata(run)) - self.assertEqual(result, [(SESSION_ID_KEY, "thread-789")]) + self.assertEqual(result, [(GEN_AI_CONVERSATION_ID_KEY, "thread-789")]) def test_returns_empty_on_no_extra(self): run = _make_run(extra=None) @@ -565,7 +564,7 @@ class TestExtractSessionInfo(TestCase): def test_extracts_session_id(self): run = _make_run(extra={"metadata": {"session_id": "s-1"}}) result = dict(extract_session_info(run)) - self.assertEqual(result[SESSION_ID_KEY], "s-1") + self.assertEqual(result[GEN_AI_CONVERSATION_ID_KEY], "s-1") def test_extracts_conversation_id(self): run = _make_run(extra={"metadata": {"conversation_id": "c-1"}})