-
-
Notifications
You must be signed in to change notification settings - Fork 122
experiment separate Broker + Router #624
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,65 @@ | ||
| """Route one task through several brokers with a shared router.""" | ||
|
|
||
| import asyncio | ||
|
|
||
| from taskiq import Flow, InMemoryBroker, TaskiqRouter | ||
|
|
||
| router = TaskiqRouter() | ||
|
|
||
| default_email_flow = Flow.queue("emails.default") | ||
| priority_email_flow = Flow.queue("emails.priority") | ||
| bulk_email_flow = Flow.queue("emails.bulk") | ||
|
|
||
| default_broker = InMemoryBroker( | ||
| router=router, | ||
| broker_name="default", | ||
| default_flow=default_email_flow, | ||
| await_inplace=True, | ||
| ) | ||
| priority_broker = InMemoryBroker( | ||
| router=router, | ||
| broker_name="priority", | ||
| default_flow=priority_email_flow, | ||
| await_inplace=True, | ||
| ) | ||
|
|
||
|
|
||
| @default_broker.task(task_name="examples.send_email", domain="notifications") | ||
| async def send_email(user_id: int, template: str) -> str: | ||
| """Pretend to render and send an email.""" | ||
| return f"{template} email sent to user {user_id}" | ||
|
|
||
|
|
||
| router.route_task( | ||
| send_email.task_name, | ||
| broker="priority", | ||
| flow=priority_email_flow, | ||
| ) | ||
|
|
||
|
|
||
| async def _main() -> None: | ||
| await default_broker.startup() | ||
| await priority_broker.startup() | ||
| try: | ||
| direct_result = await send_email(7, "welcome") | ||
|
|
||
| routed_task = await send_email.kiq(7, "welcome") | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It says that the task is routed, but it's very implicit. |
||
| routed_result = await routed_task.wait_result(timeout=2) | ||
|
|
||
| bulk_task = await send_email.kicker().with_route( | ||
| "default", | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. String references. |
||
| bulk_email_flow, | ||
| ).kiq(8, "digest") | ||
| bulk_result = await bulk_task.wait_result(timeout=2) | ||
|
|
||
| print(f"Direct call: {direct_result}") | ||
| print(f"Default route: {router.resolve_route(send_email.task_name)}") | ||
| print(f"Routed call: {routed_result.return_value}") | ||
| print(f"Route override: {bulk_result.return_value}") | ||
| finally: | ||
| await priority_broker.shutdown() | ||
| await default_broker.shutdown() | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| asyncio.run(_main()) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,48 @@ | ||
| """Declare shared task definitions and bind them in the final application.""" | ||
|
|
||
| import asyncio | ||
|
|
||
| from taskiq import Flow, InMemoryBroker, TaskiqRouter, task_builder | ||
|
|
||
|
|
||
| @task_builder("billing.calculate_total", domain="billing") | ||
| async def calculate_total(price: int, quantity: int) -> int: | ||
| """Package-level task definition that is not bound to any broker.""" | ||
| return price * quantity | ||
|
|
||
|
|
||
| router = TaskiqRouter() | ||
| billing_flow = Flow.queue("billing.tasks") | ||
| priority_billing_flow = Flow.queue("billing.priority") | ||
|
|
||
| billing_broker = InMemoryBroker( | ||
| router=router, | ||
| broker_name="billing", | ||
| default_flow=billing_flow, | ||
| await_inplace=True, | ||
| ) | ||
|
|
||
| registered_calculate_total = billing_broker.register_task(calculate_total) | ||
|
|
||
|
|
||
| async def _main() -> None: | ||
| await billing_broker.startup() | ||
| try: | ||
| direct_result = await calculate_total.call(19, 3) | ||
|
|
||
| prepared_task = registered_calculate_total.kicker().with_flow( | ||
| priority_billing_flow, | ||
| ).prepare(19, 3) | ||
|
|
||
| queued_task = await prepared_task.kiq() | ||
| queued_result = await queued_task.wait_result(timeout=2) | ||
|
|
||
| print(f"Shared task direct call: {direct_result}") | ||
| print(f"Prepared message: {prepared_task.message.task_name}") | ||
| print(f"Registered queued call: {queued_result.return_value}") | ||
| finally: | ||
| await billing_broker.shutdown() | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| asyncio.run(_main()) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,58 @@ | ||
| import enum | ||
| from dataclasses import dataclass, field, replace | ||
| from typing import Any | ||
|
|
||
| __all__ = ("Flow", "FlowKind") | ||
|
|
||
|
|
||
| @enum.unique | ||
| class FlowKind(str, enum.Enum): | ||
| """Transport-neutral flow shape.""" | ||
|
|
||
| QUEUE = "queue" | ||
| TOPIC = "topic" | ||
| STREAM = "stream" | ||
|
|
||
|
|
||
| @dataclass(frozen=True, slots=True) | ||
| class Flow: | ||
| """Transport-neutral publish or subscribe address. | ||
|
|
||
| Plain flows are intentionally generic. Every broker may interpret a flow | ||
| using its own defaults: queue name, topic, stream, channel, list key, or any | ||
| other transport address. | ||
|
|
||
| Broker packages can subclass this value object to expose transport-specific | ||
| details while still accepting plain Flow instances. | ||
| """ | ||
|
|
||
| name: str | ||
| kind: FlowKind = FlowKind.QUEUE | ||
| options: dict[str, Any] = field( | ||
| default_factory=dict, | ||
| compare=False, | ||
| hash=False, | ||
| ) | ||
|
|
||
| @classmethod | ||
| def queue(cls, name: str, **options: Any) -> "Flow": | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't get what are odds in using different flow kinds? How are they going to differ? |
||
| """Create a queue-like flow.""" | ||
| return cls(name=name, kind=FlowKind.QUEUE, options=options) | ||
|
|
||
| @classmethod | ||
| def topic(cls, name: str, **options: Any) -> "Flow": | ||
| """Create a topic-like flow.""" | ||
| return cls(name=name, kind=FlowKind.TOPIC, options=options) | ||
|
|
||
| @classmethod | ||
| def stream(cls, name: str, **options: Any) -> "Flow": | ||
| """Create a stream-like flow.""" | ||
| return cls(name=name, kind=FlowKind.STREAM, options=options) | ||
|
|
||
| def with_options(self, **options: Any) -> "Flow": | ||
| """Return the same flow with additional generic options.""" | ||
| return replace(self, options={**self.options, **options}) | ||
|
|
||
| def broker_options(self, broker_name: str) -> dict[str, Any]: | ||
| """Return transport options for broker-specific implementations.""" | ||
| return dict(self.options) | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Referencing by a magic string is not a good idea. I guess placing a broker value in here would be much better.