Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions python/lib/sift_client/_internal/grpc/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
"""
This module is primarily concerned with configuring and initializing gRPC connections to the Sift API.

Example of establishing a connection to Sift's gRPC APi:

```python
from sift_client._internal.grpc.transport import SiftChannelConfig, use_sift_channel

# Be sure not to include the url scheme i.e. 'https://' in the uri.
sift_channel_config = SiftChannelConfig(uri=SIFT_BASE_URI, apikey=SIFT_API_KEY)

with use_sift_channel(sift_channel_config) as channel:
# Connect to Sift
```
"""
Empty file.
74 changes: 74 additions & 0 deletions python/lib/sift_client/_internal/grpc/_async_interceptors/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
from __future__ import annotations

from abc import abstractmethod
from typing import Any, AsyncIterable, Callable, Iterable, TypeVar

from grpc import aio as grpc_aio

CallType = TypeVar("CallType", bound=grpc_aio.Call)
Continuation = Callable[[grpc_aio.ClientCallDetails, Any], CallType]


class ClientAsyncInterceptor(
grpc_aio.UnaryUnaryClientInterceptor,
grpc_aio.UnaryStreamClientInterceptor,
grpc_aio.StreamUnaryClientInterceptor,
grpc_aio.StreamStreamClientInterceptor,
):
@abstractmethod
async def intercept(
self,
method: Callable,
request_or_iterator: Any,
client_call_details: grpc_aio.ClientCallDetails,
) -> Any:
pass

async def intercept_unary_unary(
self,
continuation: Continuation[grpc_aio.UnaryUnaryCall],
client_call_details: grpc_aio.ClientCallDetails,
request: Any,
):
return await self.intercept(_async_swap_args(continuation), request, client_call_details)

async def intercept_unary_stream(
self,
continuation: Continuation[grpc_aio.UnaryStreamCall],
client_call_details: grpc_aio.ClientCallDetails,
request: Any,
):
return await self.intercept(_async_swap_args(continuation), request, client_call_details)

async def intercept_stream_unary(
self,
continuation: Continuation[grpc_aio.StreamUnaryCall],
client_call_details: grpc_aio.ClientCallDetails,
request_iterator: Iterable[Any] | AsyncIterable[Any],
):
return await self.intercept(
_async_swap_args(continuation), request_iterator, client_call_details
)

async def intercept_stream_stream(
self,
continuation: Continuation[grpc_aio.StreamStreamCall],
client_call_details: grpc_aio.ClientCallDetails,
request_iterator: Iterable[Any] | AsyncIterable[Any],
):
return await self.intercept(
_async_swap_args(continuation), request_iterator, client_call_details
)


def _async_swap_args(fn: Callable[[Any, Any], Any]) -> Callable[[Any, Any], Any]:
"""
Continuations are typed in such a way that details are the first argument, and the request second.
Code generated from protobuf however takes in the request first, then the details. Weird grpc library
quirk. This utility just flips the arguments.
"""

async def new_fn(x, y):
return await fn(y, x)

return new_fn
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from __future__ import annotations

from typing import Any, Callable, List, Tuple, cast

from grpc import aio as grpc_aio

from sift_client._internal.grpc._async_interceptors.base import ClientAsyncInterceptor

Metadata = List[Tuple[str, str]]


class MetadataAsyncInterceptor(ClientAsyncInterceptor):
metadata: Metadata

"""
Interceptor to add metadata to all async unary and streaming RPCs
"""

def __init__(self, metadata: Metadata):
self.metadata = metadata

async def intercept(
self,
method: Callable,
request_or_iterator: Any,
client_call_details: grpc_aio.ClientCallDetails,
):
call_details = cast("grpc_aio.ClientCallDetails", client_call_details)
new_details = grpc_aio.ClientCallDetails(
call_details.method,
call_details.timeout,
self.metadata,
call_details.credentials,
call_details.wait_for_ready,
)
return await method(request_or_iterator, new_details)
Empty file.
61 changes: 61 additions & 0 deletions python/lib/sift_client/_internal/grpc/_interceptors/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
from abc import abstractmethod
from typing import Any, Callable, Iterator

import grpc

Continuation = Callable[[grpc.ClientCallDetails, Any], Any]


class ClientInterceptor(
grpc.StreamStreamClientInterceptor,
grpc.StreamUnaryClientInterceptor,
grpc.UnaryStreamClientInterceptor,
grpc.UnaryUnaryClientInterceptor,
):
@abstractmethod
def intercept(
self,
method: Continuation,
request_or_iterator: Any,
client_call_details: grpc.ClientCallDetails,
):
pass

def intercept_unary_unary(
self,
continuation: Continuation,
client_call_details: grpc.ClientCallDetails,
request: Any,
):
return self.intercept(_swap_args(continuation), request, client_call_details)

def intercept_stream_unary(
self,
continuation: Continuation,
client_call_details: grpc.ClientCallDetails,
request_iterator: Iterator[Any],
):
return self.intercept(_swap_args(continuation), request_iterator, client_call_details)

def intercept_unary_stream(
self,
continuation: Continuation,
client_call_details: grpc.ClientCallDetails,
request: Any,
):
return self.intercept(_swap_args(continuation), request, client_call_details)

def intercept_stream_stream(
self,
continuation: Continuation,
client_call_details: grpc.ClientCallDetails,
request_iterator: Iterator[Any],
):
return self.intercept(_swap_args(continuation), request_iterator, client_call_details)


def _swap_args(fn: Callable[[Any, Any], Any]) -> Callable[[Any, Any], Any]:
def new_fn(x, y):
return fn(y, x)

return new_fn
27 changes: 27 additions & 0 deletions python/lib/sift_client/_internal/grpc/_interceptors/context.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from __future__ import annotations

from typing import Sequence

import grpc


class ClientCallDetails(grpc.ClientCallDetails):
method: str
timeout: float | None
metadata: Sequence[tuple[str, str | bytes]] | None
credentials: grpc.CallCredentials | None
wait_for_ready: bool | None

def __init__(
self,
method: str,
timeout: float | None,
metadata: Sequence[tuple[str, str | bytes]] | None,
credentials: grpc.CallCredentials | None,
wait_for_ready: bool | None,
):
self.method = method
self.timeout = timeout
self.metadata = metadata
self.credentials = credentials
self.wait_for_ready = wait_for_ready
33 changes: 33 additions & 0 deletions python/lib/sift_client/_internal/grpc/_interceptors/metadata.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from typing import Any, List, Tuple, cast

import grpc

from sift_client._internal.grpc._interceptors.base import ClientInterceptor, Continuation
from sift_client._internal.grpc._interceptors.context import ClientCallDetails

Metadata = List[Tuple[str, str]]


class MetadataInterceptor(ClientInterceptor):
metadata: Metadata

def __init__(self, metadata: Metadata):
self.metadata = metadata

def intercept(
self,
method: Continuation,
request_or_iterator: Any,
client_call_details: grpc.ClientCallDetails,
):
details = cast("ClientCallDetails", client_call_details)

new_details = ClientCallDetails(
method=details.method,
timeout=details.timeout,
credentials=details.credentials,
wait_for_ready=details.wait_for_ready,
metadata=self.metadata,
)

return method(request_or_iterator, new_details)
71 changes: 71 additions & 0 deletions python/lib/sift_client/_internal/grpc/_retry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
from __future__ import annotations

import json
from typing import ClassVar, TypedDict

from grpc import StatusCode
from typing_extensions import Self


class RetryPolicy:
"""
Retry policy meant to be used for `sift_py.grpc.transport.SiftChannel`. Users may have the ability to configure their own
custom retry policy in the future, but for now this is primarily intended for internal use.
- [Retry policy schema](https://github.com/grpc/grpc-proto/blob/ec30f589e2519d595688b9a42f88a91bdd6b733f/grpc/service_config/service_config.proto#L136)
- [Enable gRPC retry option](https://github.com/grpc/grpc/blob/9a5fdfc3d3a7fc575a394360be4532ee09a85620/include/grpc/impl/channel_arg_names.h#L311)
- [Service config option](https://github.com/grpc/grpc/blob/9a5fdfc3d3a7fc575a394360be4532ee09a85620/include/grpc/impl/channel_arg_names.h#L207)
"""

config: RetryConfig

DEFAULT_POLICY: ClassVar[RetryConfig] = {
"methodConfig": [
{
# We can configure this on a per-service and RPC basis but for now we'll
# apply this across all services and RPCs.
"name": [{}],
"retryPolicy": {
# gRPC does not allow more than 5 attempts
"maxAttempts": 5,
"initialBackoff": "0.05s",
"maxBackoff": "5s",
"backoffMultiplier": 4,
"retryableStatusCodes": [
StatusCode.INTERNAL.name,
StatusCode.UNKNOWN.name,
StatusCode.UNAVAILABLE.name,
StatusCode.ABORTED.name,
StatusCode.DEADLINE_EXCEEDED.name,
],
},
}
]
}

def __init__(self, config: RetryConfig):
self.config = config

def as_json(self) -> str:
return json.dumps(self.config)

@classmethod
def default(cls) -> Self:
return cls(config=cls.DEFAULT_POLICY)


class RetryConfig(TypedDict):
methodConfig: list[MethodConfigDict]


class MethodConfigDict(TypedDict):
name: list[dict[str, str]]
retryPolicy: RetryConfigDict


class RetryConfigDict(TypedDict):
maxAttempts: int
initialBackoff: str
maxBackoff: str
backoffMultiplier: int
retryableStatusCodes: list[str]
34 changes: 34 additions & 0 deletions python/lib/sift_client/_internal/grpc/keepalive.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from typing import TypedDict

DEFAULT_KEEPALIVE_TIME_MS = 20_000
"""Interval with which to send keepalive pings"""

DEFAULT_KEEPALIVE_TIMEOUT_MS = 20_000
"""Timeout while waiting for server to acknowledge keepalive ping"""

DEFAULT_KEEPALIVE_PERMIT_WITHOUT_CALLS = 1
"""Allows connection without any active RPCs"""

DEFAULT_MAX_PINGS_WITHOUT_DATA = 0
"""Disabled"""


# https://github.com/grpc/grpc/blob/master/doc/keepalive.md
class KeepaliveConfig(TypedDict):
"""
Make make this public in the future to allow folks to configure their own keepalive settings
if there is demand for it.
"""

keepalive_time_ms: int
keepalive_timeout_ms: int
keepalive_permit_without_calls: int
max_pings_without_data: int


DEFAULT_KEEPALIVE_CONFIG: KeepaliveConfig = {
"keepalive_time_ms": DEFAULT_KEEPALIVE_TIME_MS,
"keepalive_timeout_ms": DEFAULT_KEEPALIVE_TIMEOUT_MS,
"keepalive_permit_without_calls": DEFAULT_KEEPALIVE_PERMIT_WITHOUT_CALLS,
"max_pings_without_data": DEFAULT_MAX_PINGS_WITHOUT_DATA,
}
Empty file.
Loading
Loading