Conversation
There was a problem hiding this comment.
Pull request overview
Adds an opt-in “work item filtering” capability to the Durable Task Python SDK so workers can declare which orchestrations/activities/entities they handle, and sends those filters to the backend via GetWorkItemsRequest.workItemFilters for server-side dispatch filtering.
Changes:
- Introduces filter model types (
WorkItemFilters+ per-kind filter classes) and a new worker APITaskHubGrpcWorker.use_work_item_filters()that can auto-generate filters from the registry or accept explicit filters. - Implements filter-aware dispatch in the in-memory backend and adds extensive unit + E2E coverage (in-memory + DTS).
- Adds documentation and a runnable example demonstrating both auto-generated and explicit filters.
Reviewed changes
Copilot reviewed 10 out of 10 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| durabletask/worker.py | Adds filter classes, auto-generation logic, gRPC conversion, and worker API to enable/supply/clear filters. |
| durabletask/testing/in_memory_backend.py | Parses workItemFilters from GetWorkItemsRequest and applies filtering during dispatch. |
| durabletask/init.py | Exports WorkItemFilters from the top-level package. |
| docs/features.md | Documents opt-in behavior, auto-generation, explicit filters, and clearing filters. |
| docs/supported-patterns.md | Adds a “Work item filtering” supported pattern with usage examples. |
| examples/work_item_filtering.py | Adds an end-to-end example showing auto-generated and explicit filters with DTS. |
| tests/durabletask/test_work_item_filters.py | Unit tests for filter classes, registry auto-generation, grpc conversion, and worker API behavior. |
| tests/durabletask/test_work_item_filters_e2e.py | In-memory E2E tests verifying matching, non-matching, cleared, and entity filtering behavior. |
| tests/durabletask-azuremanaged/test_dts_work_item_filters_e2e.py | DTS E2E tests mirroring the in-memory E2E scenarios. |
| .vscode/mcp.json | Adds VS Code MCP server configuration (appears unrelated to filtering feature). |
berndverst
left a comment
There was a problem hiding this comment.
Good implementation that faithfully ports the .NET work item filtering design. The filter types, auto-generation logic, version handling, and opt-in pattern all align correctly with the dotnet implementation. Tests are comprehensive (27 unit + 7 in-memory E2E + 7 DTS E2E), and the version-aware filtering tests go beyond what the .NET test suite covers.
A few inline suggestions — nothing blocking.
Note: The PR has a merge conflict in durabletask/__init__.py — main now includes LargePayloadStorageOptions and PayloadStore exports that aren't in the PR's base. Please rebase.
Disclaimer: This review was generated by GitHub Copilot on behalf of Bernd.
| """Durable Task SDK for Python""" | ||
|
|
||
| from durabletask.worker import ConcurrencyOptions, VersioningOptions | ||
| from durabletask.worker import ConcurrencyOptions, VersioningOptions, WorkItemFilters |
There was a problem hiding this comment.
Consider also exporting OrchestrationWorkItemFilter, ActivityWorkItemFilter, and EntityWorkItemFilter from __init__.py. Users constructing explicit filters need these types (as shown in the PR's own example and docs), and in the .NET implementation these are accessible as nested types on DurableTaskWorkerWorkItemFilters. Exporting them improves public API discoverability.
from durabletask.worker import (
ConcurrencyOptions, VersioningOptions, WorkItemFilters,
OrchestrationWorkItemFilter, ActivityWorkItemFilter, EntityWorkItemFilter,
)Disclaimer: This review was generated by GitHub Copilot on behalf of Bernd.
| continue | ||
|
|
||
| # Skip if orchestration doesn't match filters | ||
| if not self._matches_filter( |
There was a problem hiding this comment.
If EntityInstanceId.parse() fails, the entity silently bypasses the filter and gets dispatched. This seems like it should be the opposite — if we can't parse the entity ID we can't verify it matches, so it should probably be skipped (or at least a warning logged). Consider either logging a warning here or treating unparseable entity IDs as non-matching for safety.
Disclaimer: This review was generated by GitHub Copilot on behalf of Bernd.
| elif state: | ||
| print(f" Failed: {state.failure_details}") | ||
|
|
||
| exit() |
There was a problem hiding this comment.
Nit: The exit() call here is unnecessary — the program will exit normally after the context managers close. Removing it would be cleaner.
Disclaimer: This review was generated by GitHub Copilot on behalf of Bernd.
|
|
||
| def __init__(self, name: str, versions: Optional[list[str]] = None): | ||
| """Initialize an orchestration filter. | ||
|
|
There was a problem hiding this comment.
The .NET implementation uses readonly struct with init properties for the filter types, which are essentially immutable data carriers. In Python, @dataclass(frozen=True) would be the natural equivalent and would provide __eq__, __repr__, and __hash__ for free. That said, the current plain-class approach is consistent with how ConcurrencyOptions and VersioningOptions are defined in this codebase, so either approach is defensible — just flagging for consideration.
Disclaimer: This review was generated by GitHub Copilot on behalf of Bernd.
Summary
This PR adds work item filtering to the Durable Task Python SDK, allowing workers to declare which orchestrations, activities, and entities they handle. Filters are sent to the backend via the
GetWorkItemsRequestgRPC message, enabling server-side filtering so that only matching work items are dispatched to a given worker.This is a port of the dotnet implementation, using an explicit opt-in design — filtering is disabled by default and must be enabled by calling
use_work_item_filters()before starting the worker.Changes
Core SDK (
durabletask/worker.py)OrchestrationWorkItemFilter,ActivityWorkItemFilter,EntityWorkItemFilter, andWorkItemFiltersWorkItemFilters._from_registry(): Auto-generates filters from the worker's registered orchestrators, activities, and entities. Includes version constraints whenVersionMatchStrategy.STRICTis configured.WorkItemFilters._to_grpc(): Converts filters to protobufWorkItemFiltersmessageTaskHubGrpcWorker.use_work_item_filters(): Public API accepting three forms:WorkItemFiltersinstance — use explicit custom filtersNone— clear previously set filtersstart()/_async_run_loop(): Applies filters to theGetWorkItemsRequestsent to the backendPublic API (
durabletask/__init__.py)WorkItemFiltersalongside existingConcurrencyOptionsandVersioningOptionsIn-memory backend (
durabletask/testing/in_memory_backend.py)WorkItemFiltersfromGetWorkItemsRequestvia_parse_work_item_filters()Documentation
docs/features.md: New "Work item filtering" section covering auto-generation, explicit filters, version constraints, and opt-in behaviordocs/supported-patterns.md: New "Work item filtering" pattern with usage examplesExample
examples/work_item_filtering.py: Full sample demonstrating both auto-generated and explicit filter usage withDurableTaskSchedulerWorkerTests
tests/durabletask/test_work_item_filters.py— 27 unit tests covering filter classes,_from_registry(),_to_grpc(), anduse_work_item_filters()behaviortests/durabletask/test_work_item_filters_e2e.py— 7 in-memory E2E tests: auto filters, explicit filters, no filters, cleared filters, entity filters, and non-matching filter scenariostests/durabletask-azuremanaged/test_dts_work_item_filters_e2e.py— 7 DTS E2E tests mirroring the in-memory tests against the Durable Task Scheduler emulatorUsage
Auto-generated filters (recommended)
Explicit filters