Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions stubs/pika/@tests/stubtest_allowlist.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
# Behind a TYPE_CHECKING guard at runtime.
pika.adapters.select_connection.SELECT_ERROR_T
pika.adapters.select_connection.POLLER_PARAMS

# The implementation has defaults for the arguments that would make the
# created instances unusable, so we require the arguments in the stub.
pika.spec.Queue.DeclareOk.__init__

# Type hackary that is unnecessary in the stubs.
pika.connection.ConnectionParameters.DefaultT
pika.connection.ConnectionParameters.T

# Arguments have a sentinel default, which is not reflected in the stubs.
pika.connection.ConnectionParameters.__init__
2 changes: 1 addition & 1 deletion stubs/pika/METADATA.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version = "1.3.*"
version = "1.4.*"
upstream-repository = "https://github.com/pika/pika"
stub-distribution = "types-pika-ts" # https://github.com/python/typeshed/issues/9246
extra-description = """\
Expand Down
14 changes: 14 additions & 0 deletions stubs/pika/pika/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,17 @@ from pika.delivery_mode import DeliveryMode as DeliveryMode
from pika.spec import BasicProperties as BasicProperties

__version__: Final[str]

__all__ = [
"adapters",
"AMQPConnectionWorkflow",
"BaseConnection",
"BasicProperties",
"BlockingConnection",
"ConnectionParameters",
"DeliveryMode",
"PlainCredentials",
"SelectConnection",
"SSLOptions",
"URLParameters",
]
3 changes: 3 additions & 0 deletions stubs/pika/pika/adapters/__init__.pyi
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
from pika.adapters.asyncio_connection import AsyncioConnection as AsyncioConnection
from pika.adapters.base_connection import BaseConnection as BaseConnection
from pika.adapters.blocking_connection import BlockingConnection as BlockingConnection
from pika.adapters.select_connection import IOLoop as IOLoop, SelectConnection as SelectConnection

__all__ = ["AsyncioConnection", "BaseConnection", "BlockingConnection", "SelectConnection", "IOLoop"]
58 changes: 35 additions & 23 deletions stubs/pika/pika/adapters/asyncio_connection.pyi
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
from asyncio import AbstractEventLoop
from collections.abc import Callable
from _typeshed import Incomplete
from asyncio import AbstractEventLoop, Future, Handle
from collections.abc import Callable, Sequence
from logging import Logger
from typing_extensions import Self

from ..connection import Parameters
from ..connection import Connection, Parameters
from .base_connection import BaseConnection
from .utils import connection_workflow, io_services_utils, nbio_interface
from .utils import io_services_utils
from .utils.connection_workflow import AbstractAMQPConnectionWorkflow, AMQPConnectorException
from .utils.nbio_interface import AbstractFileDescriptorServices, AbstractIOReference, AbstractIOServices, AbstractTimerReference

LOGGER: Logger

Expand All @@ -22,35 +25,44 @@ class AsyncioConnection(BaseConnection):
@classmethod
def create_connection(
cls,
connection_configs,
on_done,
connection_configs: Sequence[Parameters],
on_done: Callable[[Connection | AMQPConnectorException], object],
custom_ioloop: AbstractEventLoop | None = None,
workflow: connection_workflow.AbstractAMQPConnectionWorkflow | None = None,
): ...
workflow: AbstractAMQPConnectionWorkflow | None = None,
) -> AbstractAMQPConnectionWorkflow: ...

class _AsyncioIOServicesAdapter(
io_services_utils.SocketConnectionMixin,
io_services_utils.StreamingConnectionMixin,
nbio_interface.AbstractIOServices,
nbio_interface.AbstractFileDescriptorServices,
AbstractIOServices,
AbstractFileDescriptorServices,
):
def __init__(self, loop: AbstractEventLoop | None = None) -> None: ...
def get_native_ioloop(self): ...
def get_native_ioloop(self) -> AbstractEventLoop: ...
def close(self) -> None: ...
def run(self) -> None: ...
def stop(self) -> None: ...
def add_callback_threadsafe(self, callback) -> None: ...
def call_later(self, delay, callback): ...
def getaddrinfo(self, host, port, on_done, family: int = 0, socktype: int = 0, proto: int = 0, flags: int = 0): ...
def set_reader(self, fd, on_readable) -> None: ...
def remove_reader(self, fd): ...
def set_writer(self, fd, on_writable) -> None: ...
def remove_writer(self, fd): ...
def add_callback_threadsafe(self, callback: Callable[[], object]) -> None: ...
def call_later(self, delay: float, callback: Callable[[], object]) -> _TimerHandle: ...
def getaddrinfo(
self,
host: str,
port: int,
on_done: Callable[..., object],
family: int = 0,
socktype: int = 0,
proto: int = 0,
flags: int = 0,
) -> AbstractIOReference: ...
def set_reader(self, fd: int, on_readable: Callable[[], object]) -> None: ...
def remove_reader(self, fd: int) -> bool: ...
def set_writer(self, fd: int, on_writable: Callable[[], object]) -> None: ...
def remove_writer(self, fd: int) -> bool: ...

class _TimerHandle(nbio_interface.AbstractTimerReference):
def __init__(self, handle) -> None: ...
class _TimerHandle(AbstractTimerReference):
def __init__(self, handle: Handle) -> None: ...
def cancel(self) -> None: ...

class _AsyncioIOReference(nbio_interface.AbstractIOReference):
def __init__(self, future, on_done) -> None: ...
def cancel(self): ...
class _AsyncioIOReference(AbstractIOReference):
def __init__(self, future: Future[Incomplete], on_done: Callable[[BaseConnection | BaseException], object]) -> None: ...
def cancel(self) -> bool: ...
12 changes: 6 additions & 6 deletions stubs/pika/pika/adapters/base_connection.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -4,28 +4,28 @@ from collections.abc import Callable
from logging import Logger
from typing_extensions import Self

from ..adapters.utils import nbio_interface
from ..connection import Connection
from ..adapters.utils.nbio_interface import AbstractIOServices, AbstractStreamProtocol
from ..connection import Connection, Parameters

LOGGER: Logger

class BaseConnection(Connection, metaclass=abc.ABCMeta):
def __init__(
self,
parameters,
parameters: Parameters | None,
on_open_callback: Callable[[Self], object] | None,
on_open_error_callback: Callable[[Self, BaseException], object] | None,
on_close_callback: Callable[[Self, BaseException], object] | None,
nbio,
internal_connection_workflow: bool,
nbio: AbstractIOServices,
internal_connection_workflow: bool = True,
) -> None: ...
@classmethod
@abc.abstractmethod
def create_connection(cls, connection_configs, on_done, custom_ioloop=None, workflow=None): ...
@property
def ioloop(self): ...

class _StreamingProtocolShim(nbio_interface.AbstractStreamProtocol):
class _StreamingProtocolShim(AbstractStreamProtocol):
connection_made: Incomplete
connection_lost: Incomplete
eof_received: Incomplete
Expand Down
84 changes: 56 additions & 28 deletions stubs/pika/pika/adapters/blocking_connection.pyi
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
from _typeshed import Incomplete, Unused
from collections.abc import Generator, Sequence
from collections.abc import Callable, Generator, Sequence
from logging import Logger
from types import TracebackType
from typing import NamedTuple
from typing import Any, NamedTuple, TypeVar
from typing_extensions import Self

from ..connection import Parameters
from ..data import _ArgumentMapping
from ..exchange_type import ExchangeType
from ..spec import BasicProperties
from ..frame import Method
from ..spec import Basic, BasicProperties, Connection, Exchange, Queue, Tx

T = TypeVar("T", bound=Connection.Blocked | Connection.Unblocked) # noqa: Y001

LOGGER: Logger

Expand Down Expand Up @@ -189,19 +191,33 @@ class BlockingChannel:
def add_on_cancel_callback(self, callback) -> None: ...
def add_on_return_callback(self, callback): ...
def basic_consume(
self, queue, on_message_callback, auto_ack: bool = False, exclusive: bool = False, consumer_tag=None, arguments=None
): ...
def basic_cancel(self, consumer_tag): ...
self,
queue: str,
on_message_callback: Callable[[BlockingChannel, Basic.Deliver, BasicProperties, bytes], object],
auto_ack: bool = False,
exclusive: bool = False,
consumer_tag: str | None = None,
arguments: dict[str, Any] | None = None,
) -> str: ...
def basic_cancel(self, consumer_tag: str) -> Sequence[tuple[Basic.Deliver, BasicProperties, bytes]]: ...
def start_consuming(self) -> None: ...
def stop_consuming(self, consumer_tag=None) -> None: ...
def stop_consuming(self, consumer_tag: str | None = None) -> None: ...
def consume(
self, queue, auto_ack: bool = False, exclusive: bool = False, arguments=None, inactivity_timeout=None
) -> Generator[Incomplete]: ...
def get_waiting_message_count(self): ...
def cancel(self): ...
self,
queue: str,
auto_ack: bool = False,
exclusive: bool = False,
arguments: dict[str, Any] | None = None,
inactivity_timeout: float | None = None,
consumer_tag: str | None = None,
) -> Generator[tuple[Basic.Deliver | None, BasicProperties | None, bytes | None]]: ...
def get_waiting_message_count(self) -> int: ...
def cancel(self) -> int: ...
def basic_ack(self, delivery_tag: int = 0, multiple: bool = False) -> None: ...
def basic_nack(self, delivery_tag: int = 0, multiple: bool = False, requeue: bool = True) -> None: ...
def basic_get(self, queue, auto_ack: bool = False): ...
def basic_get(
self, queue: str, auto_ack: bool = False
) -> tuple[Basic.GetOk | None, BasicProperties | None, bytes | None]: ...
def basic_publish(
self,
exchange: str,
Expand All @@ -222,24 +238,36 @@ class BlockingChannel:
durable: bool = False,
auto_delete: bool = False,
internal: bool = False,
arguments: _ArgumentMapping | None = None,
): ...
def exchange_delete(self, exchange: str | None = None, if_unused: bool = False): ...
def exchange_bind(self, destination, source, routing_key: str = "", arguments=None): ...
def exchange_unbind(self, destination=None, source=None, routing_key: str = "", arguments=None): ...
arguments: dict[str, Any] | None = None,
) -> None: ...
def exchange_delete(self, exchange: str | None = None, if_unused: bool = False) -> Method[Exchange.DeleteOk]: ...
def exchange_bind(
self, destination: str, source: str, routing_key: str = "", arguments: dict[str, Any] | None = None
) -> Method[Exchange.BindOk]: ...
def exchange_unbind(
self,
destination: str | None = None,
source: str | None = None,
routing_key: str = "",
arguments: dict[str, Any] | None = None,
) -> Method[Exchange.UnbindOk]: ...
def queue_declare(
self,
queue,
queue: str,
passive: bool = False,
durable: bool = False,
exclusive: bool = False,
auto_delete: bool = False,
arguments=None,
): ...
def queue_delete(self, queue, if_unused: bool = False, if_empty: bool = False): ...
def queue_purge(self, queue): ...
def queue_bind(self, queue, exchange, routing_key=None, arguments=None): ...
def queue_unbind(self, queue, exchange=None, routing_key=None, arguments=None): ...
def tx_select(self): ...
def tx_commit(self): ...
def tx_rollback(self): ...
arguments: dict[str, Any] | None = None,
) -> Method[Queue.DeclareOk]: ...
def queue_delete(self, queue: str, if_unused: bool = False, if_empty: bool = False) -> Method[Queue.DeleteOk]: ...
def queue_purge(self, queue: str) -> Method[Queue.PurgeOk]: ...
def queue_bind(
self, queue: str, exchange: str, routing_key: str | None = None, arguments: dict[str, Any] | None = None
) -> Method[Queue.BindOk]: ...
def queue_unbind(
self, queue: str, exchange: str, routing_key: str | None = None, arguments: dict[str, Any] | None = None
) -> Method[Queue.UnbindOk]: ...
def tx_select(self) -> Method[Tx.SelectOk]: ...
def tx_commit(self) -> Method[Tx.CommitOk]: ...
def tx_rollback(self) -> Method[Tx.RollbackOk]: ...
18 changes: 14 additions & 4 deletions stubs/pika/pika/adapters/select_connection.pyi
Original file line number Diff line number Diff line change
@@ -1,13 +1,22 @@
import abc
import select
from _typeshed import Incomplete
from collections.abc import Callable
from logging import Logger
from typing import Final, Literal, TypeAlias, TypedDict

import pika.compat
from pika.adapters.base_connection import BaseConnection
from pika.adapters.utils.selector_ioloop_adapter import AbstractSelectorIOLoop

SELECT_ERROR_T: TypeAlias = OSError | InterruptedError | select.error

class POLLER_PARAMS(TypedDict):
get_wait_seconds: Callable[[], float | None]
process_timeouts: Callable[[], object]

LOGGER: Logger
SELECT_TYPE: Incomplete
SELECT_TYPE: Literal["epoll", "kqueue", "poll"] | None

class SelectConnection(BaseConnection):
def __init__(
Expand Down Expand Up @@ -43,9 +52,10 @@ class _Timer:
def process_timeouts(self) -> None: ...

class PollEvents:
READ: Incomplete
WRITE: Incomplete
ERROR: Incomplete
READ: Final[int]
WRITE: Final[int]
ERROR: Final[int]
HANGUP: Final[int]

class IOLoop(AbstractSelectorIOLoop):
READ: Incomplete
Expand Down
31 changes: 18 additions & 13 deletions stubs/pika/pika/adapters/twisted_connection.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
# We don't want to force it as a dependency but that means we also can't test it with type-checkers given the current setup.

from _typeshed import Incomplete
from collections.abc import Callable
from logging import Logger
from typing import Generic, NamedTuple, TypeVar
from typing import Any, Generic, NamedTuple, TypeVar

import pika.connection
from pika.adapters.utils import nbio_interface
from pika.adapters.utils.nbio_interface import AbstractTimerReference
from twisted.internet.base import DelayedCall # type: ignore[import-not-found] # pyright: ignore[reportMissingImports]
from twisted.internet.defer import ( # type: ignore[import-not-found] # pyright: ignore[reportMissingImports]
Deferred,
Expand All @@ -16,6 +16,8 @@ from twisted.internet.interfaces import ITransport # type: ignore[import-not-fo
from twisted.internet.protocol import Protocol # type: ignore[import-not-found] # pyright: ignore[reportMissingImports]
from twisted.python.failure import Failure # type: ignore[import-not-found] # pyright: ignore[reportMissingImports]

from ..connection import Connection, Parameters

_T = TypeVar("_T")

LOGGER: Logger
Expand Down Expand Up @@ -93,13 +95,9 @@ class TwistedChannel:
self, exchange: Incomplete | None = ..., if_unused: bool = ...
) -> Deferred[Incomplete | Failure | BaseException | None]: ...
def exchange_unbind(
self,
destination: Incomplete | None = ...,
source: Incomplete | None = ...,
routing_key: str = ...,
arguments: Incomplete | None = ...,
self, destination: str, source: str, routing_key: str = "", arguments: dict[str, Any] | None = None
) -> Deferred[Incomplete | Failure | BaseException | None]: ...
def flow(self, active) -> Deferred[Incomplete | Failure | BaseException | None]: ...
def flow(self, active: bool = True) -> Deferred[Incomplete | Failure | BaseException | None]: ...
def open(self): ...
def queue_bind(
self, queue, exchange, routing_key: Incomplete | None = ..., arguments: Incomplete | None = ...
Expand All @@ -118,14 +116,21 @@ class TwistedChannel:
) -> Deferred[Incomplete | Failure | BaseException | None]: ...
def queue_purge(self, queue) -> Deferred[Incomplete | Failure | BaseException | None]: ...
def queue_unbind(
self, queue, exchange: Incomplete | None = ..., routing_key: Incomplete | None = ..., arguments: Incomplete | None = ...
self, queue: str, exchange: str | None, routing_key: str | None = None, arguments: dict[str, Any] | None = None
) -> Deferred[Incomplete | Failure | BaseException | None]: ...
def tx_commit(self) -> Deferred[Incomplete | Failure | BaseException | None]: ...
def tx_rollback(self) -> Deferred[Incomplete | Failure | BaseException | None]: ...
def tx_select(self) -> Deferred[Incomplete | Failure | BaseException | None]: ...

class _TwistedConnectionAdapter(pika.connection.Connection):
def __init__(self, parameters, on_open_callback, on_open_error_callback, on_close_callback, custom_reactor) -> None: ...
class _TwistedConnectionAdapter(Connection):
def __init__(
self,
parameters: Parameters | None,
on_open_callback: Callable[[Connection], object] | None,
on_open_error_callback: Callable[[Connection, Exception], object] | None,
on_close_callback: Callable[[Connection, Exception], object] | None,
custom_reactor=None,
) -> None: ...
def connection_made(self, transport: ITransport) -> None: ...
def connection_lost(self, error: Exception) -> None: ...
def data_received(self, data) -> None: ...
Expand All @@ -145,6 +150,6 @@ class TwistedProtocolConnection(Protocol): # pyright: ignore[reportUntypedBaseC
def makeConnection(self, transport: ITransport) -> None: ...
def connectionReady(self): ...

class _TimerHandle(nbio_interface.AbstractTimerReference):
class _TimerHandle(AbstractTimerReference):
def __init__(self, handle: DelayedCall) -> None: ...
def cancel(self) -> None: ...
Loading
Loading