diff --git a/src/openai/__init__.py b/src/openai/__init__.py index cbaef0615f..5fa5325479 100644 --- a/src/openai/__init__.py +++ b/src/openai/__init__.py @@ -35,6 +35,7 @@ InvalidWebhookSignatureError, ContentFilterFinishReasonError, WebSocketConnectionClosedError, + IncompleteResponseError, ) from ._base_client import DefaultHttpxClient, DefaultAioHttpClient, DefaultAsyncHttpxClient from ._utils._logs import setup_logging as _setup_logging @@ -71,6 +72,7 @@ "LengthFinishReasonError", "ContentFilterFinishReasonError", "InvalidWebhookSignatureError", + "IncompleteResponseError", "Timeout", "RequestOptions", "Client", diff --git a/src/openai/_base_client.py b/src/openai/_base_client.py index 216b36aabd..f19f04f68e 100644 --- a/src/openai/_base_client.py +++ b/src/openai/_base_client.py @@ -9,6 +9,7 @@ import inspect import logging import platform +import socket import warnings import email.utils from types import TracebackType @@ -831,11 +832,25 @@ def _idempotency_key(self) -> str: return f"stainless-python-retry-{uuid.uuid4()}" +def _get_default_socket_options() -> list[tuple[int, int, int]]: + """Return socket options to enable TCP keepalive for NAT/gateway environments.""" + # TCP keepalive: enables keep-alive packets to detect dead connections + # Needed for NAT gateways that drop idle connections after their timeout + if sys.platform == "darwin": + # macOS: TCP_KEEPALIVE = 16 + return [(socket.IPPROTO_TCP, socket.TCP_KEEPALIVE, 1)] + # Linux/others: TCP_KEEPIDLE (start keepalive after 60s idle) + return [(socket.IPPROTO_TCP, getattr(socket, "TCP_KEEPIDLE", 4), 60)] + + class _DefaultHttpxClient(httpx.Client): def __init__(self, **kwargs: Any) -> None: kwargs.setdefault("timeout", DEFAULT_TIMEOUT) kwargs.setdefault("limits", DEFAULT_CONNECTION_LIMITS) kwargs.setdefault("follow_redirects", True) + # Enable TCP keepalive to prevent non-streaming calls from hanging + # behind NAT gateways that drop idle connections + kwargs.setdefault("socket_options", _get_default_socket_options()) super().__init__(**kwargs) @@ -1423,6 +1438,7 @@ def __init__(self, **kwargs: Any) -> None: kwargs.setdefault("timeout", DEFAULT_TIMEOUT) kwargs.setdefault("limits", DEFAULT_CONNECTION_LIMITS) kwargs.setdefault("follow_redirects", True) + kwargs.setdefault("socket_options", _get_default_socket_options()) super().__init__(**kwargs) diff --git a/src/openai/_exceptions.py b/src/openai/_exceptions.py index 86f44b0e15..611a790b07 100644 --- a/src/openai/_exceptions.py +++ b/src/openai/_exceptions.py @@ -30,6 +30,7 @@ "SubjectTokenProviderError", "WebSocketConnectionClosedError", "WebSocketQueueFullError", + "IncompleteResponseError", ] @@ -205,3 +206,26 @@ class WebSocketQueueFullError(OpenAIError): """Raised when the outgoing WebSocket message queue exceeds its byte-size limit.""" pass + + +class IncompleteResponseError(OpenAIError): + """Raised when a streaming response ends with incomplete status. + + This typically occurs when the response is truncated due to max_output_tokens + or content_filter restrictions. + """ + + response_id: str + incomplete_details_reason: Optional[Literal["max_output_tokens", "content_filter"]] + + def __init__( + self, + *, + response_id: str, + incomplete_details_reason: Optional[Literal["max_output_tokens", "content_filter"]], + ) -> None: + reason_str = incomplete_details_reason or "unknown" + message = f"Response {response_id} is incomplete: {reason_str}" + super().__init__(message) + self.response_id = response_id + self.incomplete_details_reason = incomplete_details_reason diff --git a/src/openai/lib/streaming/responses/_responses.py b/src/openai/lib/streaming/responses/_responses.py index 6975a9260d..22e48ad3d9 100644 --- a/src/openai/lib/streaming/responses/_responses.py +++ b/src/openai/lib/streaming/responses/_responses.py @@ -25,6 +25,7 @@ ParsedResponseOutputMessage, ParsedResponseFunctionToolCall, ) +from ...._exceptions import IncompleteResponseError class ResponseStream(Generic[TextFormatT]): @@ -276,6 +277,8 @@ def handle_event(self, event: RawResponseStreamEvent) -> List[ResponseStreamEven content = output.content[event.content_index] assert content.type == "output_text" + # Don't parse here - defer parsing until response.completed or response.incomplete + # is received, so we can properly handle incomplete responses events.append( build( ResponseTextDoneEvent[TextFormatT], @@ -286,7 +289,7 @@ def handle_event(self, event: RawResponseStreamEvent) -> List[ResponseStreamEven logprobs=event.logprobs, type="response.output_text.done", text=event.text, - parsed=parse_text(event.text, text_format=self._text_format), + parsed=None, # type: ignore[arg-type] ) ) elif event.type == "response.function_call_arguments.delta": @@ -317,6 +320,13 @@ def handle_event(self, event: RawResponseStreamEvent) -> List[ResponseStreamEven response=response, ) ) + elif event.type == "response.incomplete": + # Raise an error for incomplete responses instead of letting + # Pydantic JSON validation errors bubble up later + raise IncompleteResponseError( + response_id=event.response.id, + incomplete_details_reason=event.response.incomplete_details.reason if event.response.incomplete_details else None, + ) else: events.append(event)