Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
217 changes: 189 additions & 28 deletions sentry_sdk/integrations/anthropic.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
Omit = None

from anthropic.resources import AsyncMessages, Messages
from anthropic.lib.streaming._messages import MessageStreamManager

from anthropic.types import (
MessageStartEvent,
Expand All @@ -59,7 +60,14 @@
from sentry_sdk._types import TextPart

from anthropic import AsyncStream
from anthropic.types import RawMessageStreamEvent
from anthropic.types import (
RawMessageStreamEvent,
MessageParam,
ModelParam,
TextBlockParam,
ToolUnionParam,
MessageStream,
)


class _RecordedUsage:
Expand All @@ -84,6 +92,11 @@ def setup_once() -> None:
Messages.create = _wrap_message_create(Messages.create)
AsyncMessages.create = _wrap_message_create_async(AsyncMessages.create)

Messages.stream = _wrap_message_stream(Messages.stream)
MessageStreamManager.__enter__ = _wrap_message_stream_manager_enter(
MessageStreamManager.__enter__
)


def _capture_exception(exc: "Any") -> None:
set_span_errored()
Expand Down Expand Up @@ -253,27 +266,32 @@ def _transform_system_instructions(
]


def _set_input_data(
span: "Span", kwargs: "dict[str, Any]", integration: "AnthropicIntegration"
def _common_set_input_data(
span: "Span",
integration: "AnthropicIntegration",
max_tokens: "int",
messages: "Iterable[MessageParam]",
model: "ModelParam",
system: "Optional[Union[str, Iterable[TextBlockParam]]]",
temperature: "Optional[float]",
top_k: "Optional[int]",
top_p: "Optional[float]",
tools: "Optional[Iterable[ToolUnionParam]]",
) -> None:
"""
Set input data for the span based on the provided keyword arguments for the anthropic message creation.
"""
span.set_data(SPANDATA.GEN_AI_OPERATION_NAME, "chat")
system_instructions: "Union[str, Iterable[TextBlockParam]]" = kwargs.get("system") # type: ignore
messages = kwargs.get("messages")
if (
messages is not None
and len(messages) > 0
and len(messages) > 0 # type: ignore
and should_send_default_pii()
and integration.include_prompts
):
if isinstance(system_instructions, str) or isinstance(
system_instructions, Iterable
):
if isinstance(system, str) or isinstance(system, Iterable):
span.set_data(
SPANDATA.GEN_AI_SYSTEM_INSTRUCTIONS,
json.dumps(_transform_system_instructions(system_instructions)),
json.dumps(_transform_system_instructions(system)),
)

normalized_messages = []
Expand Down Expand Up @@ -329,25 +347,67 @@ def _set_input_data(
span, SPANDATA.GEN_AI_REQUEST_MESSAGES, messages_data, unpack=False
)

if max_tokens is not None and _is_given(max_tokens):
span.set_data(SPANDATA.GEN_AI_REQUEST_MAX_TOKENS, max_tokens)
if model is not None and _is_given(model):
span.set_data(SPANDATA.GEN_AI_REQUEST_MODEL, model)
if temperature is not None and _is_given(temperature):
span.set_data(SPANDATA.GEN_AI_REQUEST_TEMPERATURE, temperature)
if top_k is not None and _is_given(top_k):
span.set_data(SPANDATA.GEN_AI_REQUEST_TOP_K, top_k)
if top_p is not None and _is_given(top_p):
span.set_data(SPANDATA.GEN_AI_REQUEST_TOP_P, top_p)

if tools is not None and _is_given(tools) and len(tools) > 0: # type: ignore
span.set_data(SPANDATA.GEN_AI_REQUEST_AVAILABLE_TOOLS, safe_serialize(tools))


def _set_create_input_data(
span: "Span", kwargs: "dict[str, Any]", integration: "AnthropicIntegration"
) -> None:
"""
Set input data for the span based on the provided keyword arguments for the anthropic message creation.
"""
span.set_data(SPANDATA.GEN_AI_RESPONSE_STREAMING, kwargs.get("stream", False))

kwargs_keys_to_attributes = {
"max_tokens": SPANDATA.GEN_AI_REQUEST_MAX_TOKENS,
"model": SPANDATA.GEN_AI_REQUEST_MODEL,
"temperature": SPANDATA.GEN_AI_REQUEST_TEMPERATURE,
"top_k": SPANDATA.GEN_AI_REQUEST_TOP_K,
"top_p": SPANDATA.GEN_AI_REQUEST_TOP_P,
}
for key, attribute in kwargs_keys_to_attributes.items():
value = kwargs.get(key)

if value is not None and _is_given(value):
span.set_data(attribute, value)

# Input attributes: Tools
tools = kwargs.get("tools")
if tools is not None and _is_given(tools) and len(tools) > 0:
span.set_data(SPANDATA.GEN_AI_REQUEST_AVAILABLE_TOOLS, safe_serialize(tools))
_common_set_input_data(
span=span,
integration=integration,
max_tokens=kwargs.get("max_tokens"), # type: ignore
messages=kwargs.get("messages"), # type: ignore
model=kwargs.get("model"),
system=kwargs.get("system"),
temperature=kwargs.get("temperature"),
top_k=kwargs.get("top_k"),
top_p=kwargs.get("top_p"),
tools=kwargs.get("tools"),
)


def _set_stream_input_data(
span: "Span",
integration: "AnthropicIntegration",
max_tokens: "int",
messages: "Iterable[MessageParam]",
model: "ModelParam",
system: "Optional[Union[str, Iterable[TextBlockParam]]]",
temperature: "Optional[float]",
top_k: "Optional[int]",
top_p: "Optional[float]",
tools: "Optional[Iterable[ToolUnionParam]]",
) -> None:
_common_set_input_data(
span=span,
integration=integration,
max_tokens=max_tokens,
messages=messages,
model=model,
system=system,
temperature=temperature,
top_k=top_k,
top_p=top_p,
tools=tools,
)


def _set_output_data(
Expand Down Expand Up @@ -543,7 +603,7 @@ def _sentry_patched_create_common(f: "Any", *args: "Any", **kwargs: "Any") -> "A
)
span.__enter__()

_set_input_data(span, kwargs, integration)
_set_create_input_data(span, kwargs, integration)

result = yield f, args, kwargs

Expand Down Expand Up @@ -664,6 +724,107 @@ async def _sentry_patched_create_async(*args: "Any", **kwargs: "Any") -> "Any":
return _sentry_patched_create_async


def _sentry_patched_stream_common(
stream: "MessageStream",
max_tokens: "int",
messages: "Iterable[MessageParam]",
model: "ModelParam",
system: "Union[str, Iterable[TextBlockParam]]",
temperature: "float",
top_k: "int",
top_p: "float",
tools: "Iterable[ToolUnionParam]",
) -> None:
integration = sentry_sdk.get_client().get_integration(AnthropicIntegration)

if integration is None:
return stream

if messages is None:
return stream

try:
iter(messages)
except TypeError:
return stream

if model is None:
model = ""

span = get_start_span_function()(
op=OP.GEN_AI_CHAT,
name=f"chat {model}".strip(),
origin=AnthropicIntegration.origin,
)
span.__enter__()

span.set_data(SPANDATA.GEN_AI_RESPONSE_STREAMING, True)
_set_stream_input_data(
span,
integration,
max_tokens=max_tokens,
messages=messages,
model=model,
system=system,
temperature=temperature,
top_k=top_k,
top_p=top_p,
tools=tools,
)
_patch_streaming_response_iterator(stream, span, integration)


def _wrap_message_stream(f: "Any") -> "Any":
"""
Attaches user-provided arguments to the returned context manager.
The attributes are set on `gen_ai.chat` spans in the patch for the context manager.
"""

@wraps(f)
def _sentry_patched_stream(*args: "Any", **kwargs: "Any") -> "MessageStreamManager":
stream_manager = f(*args, **kwargs)

stream_manager._max_tokens = kwargs.get("max_tokens")
stream_manager._messages = kwargs.get("messages")
stream_manager._model = kwargs.get("model")
stream_manager._system = kwargs.get("system")
stream_manager._temperature = kwargs.get("temperature")
stream_manager._top_k = kwargs.get("top_k")
stream_manager._top_p = kwargs.get("top_p")
stream_manager._tools = kwargs.get("tools")

return stream_manager

return _sentry_patched_stream


def _wrap_message_stream_manager_enter(f: "Any") -> "Any":
"""
Creates and manages `gen_ai.chat` spans.
"""

@wraps(f)
def _sentry_patched_enter(self: "MessageStreamManager") -> "MessageStream":
stream = f(self)
if not hasattr(self, "_max_tokens"):
return stream

_sentry_patched_stream_common(
stream=stream,
max_tokens=self._max_tokens,
messages=self._messages,
model=self._model,
system=self._system,
temperature=self._temperature,
top_k=self._top_k,
top_p=self._top_p,
tools=self._tools,
)
return stream

return _sentry_patched_enter


def _is_given(obj: "Any") -> bool:
"""
Check for givenness safely across different anthropic versions.
Expand Down
Loading
Loading