Skip to content

Implement work item filtering#128

Open
andystaples wants to merge 2 commits intomicrosoft:mainfrom
andystaples:andystaples/add-workitem-filtering
Open

Implement work item filtering#128
andystaples wants to merge 2 commits intomicrosoft:mainfrom
andystaples:andystaples/add-workitem-filtering

Conversation

@andystaples
Copy link
Copy Markdown
Contributor

Summary

This PR adds work item filtering to the Durable Task Python SDK, allowing workers to declare which orchestrations, activities, and entities they handle. Filters are sent to the backend via the GetWorkItemsRequest gRPC message, enabling server-side filtering so that only matching work items are dispatched to a given worker.

This is a port of the dotnet implementation, using an explicit opt-in design — filtering is disabled by default and must be enabled by calling use_work_item_filters() before starting the worker.

Changes

Core SDK (durabletask/worker.py)

  • New filter data classes: OrchestrationWorkItemFilter, ActivityWorkItemFilter, EntityWorkItemFilter, and WorkItemFilters
  • WorkItemFilters._from_registry(): Auto-generates filters from the worker's registered orchestrators, activities, and entities. Includes version constraints when VersionMatchStrategy.STRICT is configured.
  • WorkItemFilters._to_grpc(): Converts filters to protobuf WorkItemFilters message
  • TaskHubGrpcWorker.use_work_item_filters(): Public API accepting three forms:
    • No argument — auto-generate filters from registry at start time
    • WorkItemFilters instance — use explicit custom filters
    • None — clear previously set filters
  • start() / _async_run_loop(): Applies filters to the GetWorkItemsRequest sent to the backend

Public API (durabletask/__init__.py)

  • Exports WorkItemFilters alongside existing ConcurrencyOptions and VersioningOptions

In-memory backend (durabletask/testing/in_memory_backend.py)

  • Parses WorkItemFilters from GetWorkItemsRequest via _parse_work_item_filters()
  • Filters orchestration, activity, and entity work items during dispatch
  • Uses a collect-and-requeue pattern to avoid infinite loops when items don't match

Documentation

  • docs/features.md: New "Work item filtering" section covering auto-generation, explicit filters, version constraints, and opt-in behavior
  • docs/supported-patterns.md: New "Work item filtering" pattern with usage examples

Example

  • examples/work_item_filtering.py: Full sample demonstrating both auto-generated and explicit filter usage with DurableTaskSchedulerWorker

Tests

  • tests/durabletask/test_work_item_filters.py — 27 unit tests covering filter classes, _from_registry(), _to_grpc(), and use_work_item_filters() behavior
  • tests/durabletask/test_work_item_filters_e2e.py — 7 in-memory E2E tests: auto filters, explicit filters, no filters, cleared filters, entity filters, and non-matching filter scenarios
  • tests/durabletask-azuremanaged/test_dts_work_item_filters_e2e.py — 7 DTS E2E tests mirroring the in-memory tests against the Durable Task Scheduler emulator

Usage

Auto-generated filters (recommended)

worker = TaskHubGrpcWorker()
worker.add_orchestrator(my_orchestrator)
worker.add_activity(my_activity)
worker.use_work_item_filters()  # derives filters from registered tasks
worker.start()

Explicit filters

from durabletask.worker import (
    WorkItemFilters,
    OrchestrationWorkItemFilter,
    ActivityWorkItemFilter,
)

worker = TaskHubGrpcWorker()
worker.add_orchestrator(my_orchestrator)
worker.add_activity(my_activity)
worker.use_work_item_filters(WorkItemFilters(
    orchestrations=[
        OrchestrationWorkItemFilter(name="my_orchestrator", versions=["v1"]),
    ],
    activities=[
        ActivityWorkItemFilter(name="my_activity"),
    ],
))
worker.start()

@andystaples andystaples requested review from berndverst and Copilot and removed request for Copilot March 27, 2026 18:12
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds an opt-in “work item filtering” capability to the Durable Task Python SDK so workers can declare which orchestrations/activities/entities they handle, and sends those filters to the backend via GetWorkItemsRequest.workItemFilters for server-side dispatch filtering.

Changes:

  • Introduces filter model types (WorkItemFilters + per-kind filter classes) and a new worker API TaskHubGrpcWorker.use_work_item_filters() that can auto-generate filters from the registry or accept explicit filters.
  • Implements filter-aware dispatch in the in-memory backend and adds extensive unit + E2E coverage (in-memory + DTS).
  • Adds documentation and a runnable example demonstrating both auto-generated and explicit filters.

Reviewed changes

Copilot reviewed 10 out of 10 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
durabletask/worker.py Adds filter classes, auto-generation logic, gRPC conversion, and worker API to enable/supply/clear filters.
durabletask/testing/in_memory_backend.py Parses workItemFilters from GetWorkItemsRequest and applies filtering during dispatch.
durabletask/init.py Exports WorkItemFilters from the top-level package.
docs/features.md Documents opt-in behavior, auto-generation, explicit filters, and clearing filters.
docs/supported-patterns.md Adds a “Work item filtering” supported pattern with usage examples.
examples/work_item_filtering.py Adds an end-to-end example showing auto-generated and explicit filters with DTS.
tests/durabletask/test_work_item_filters.py Unit tests for filter classes, registry auto-generation, grpc conversion, and worker API behavior.
tests/durabletask/test_work_item_filters_e2e.py In-memory E2E tests verifying matching, non-matching, cleared, and entity filtering behavior.
tests/durabletask-azuremanaged/test_dts_work_item_filters_e2e.py DTS E2E tests mirroring the in-memory E2E scenarios.
.vscode/mcp.json Adds VS Code MCP server configuration (appears unrelated to filtering feature).

Copy link
Copy Markdown
Member

@berndverst berndverst left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good implementation that faithfully ports the .NET work item filtering design. The filter types, auto-generation logic, version handling, and opt-in pattern all align correctly with the dotnet implementation. Tests are comprehensive (27 unit + 7 in-memory E2E + 7 DTS E2E), and the version-aware filtering tests go beyond what the .NET test suite covers.

A few inline suggestions — nothing blocking.

Note: The PR has a merge conflict in durabletask/__init__.pymain now includes LargePayloadStorageOptions and PayloadStore exports that aren't in the PR's base. Please rebase.

Disclaimer: This review was generated by GitHub Copilot on behalf of Bernd.

"""Durable Task SDK for Python"""

from durabletask.worker import ConcurrencyOptions, VersioningOptions
from durabletask.worker import ConcurrencyOptions, VersioningOptions, WorkItemFilters
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider also exporting OrchestrationWorkItemFilter, ActivityWorkItemFilter, and EntityWorkItemFilter from __init__.py. Users constructing explicit filters need these types (as shown in the PR's own example and docs), and in the .NET implementation these are accessible as nested types on DurableTaskWorkerWorkItemFilters. Exporting them improves public API discoverability.

from durabletask.worker import (
    ConcurrencyOptions, VersioningOptions, WorkItemFilters,
    OrchestrationWorkItemFilter, ActivityWorkItemFilter, EntityWorkItemFilter,
)

Disclaimer: This review was generated by GitHub Copilot on behalf of Bernd.

continue

# Skip if orchestration doesn't match filters
if not self._matches_filter(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If EntityInstanceId.parse() fails, the entity silently bypasses the filter and gets dispatched. This seems like it should be the opposite — if we can't parse the entity ID we can't verify it matches, so it should probably be skipped (or at least a warning logged). Consider either logging a warning here or treating unparseable entity IDs as non-matching for safety.

Disclaimer: This review was generated by GitHub Copilot on behalf of Bernd.

elif state:
print(f" Failed: {state.failure_details}")

exit()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: The exit() call here is unnecessary — the program will exit normally after the context managers close. Removing it would be cleaner.

Disclaimer: This review was generated by GitHub Copilot on behalf of Bernd.


def __init__(self, name: str, versions: Optional[list[str]] = None):
"""Initialize an orchestration filter.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The .NET implementation uses readonly struct with init properties for the filter types, which are essentially immutable data carriers. In Python, @dataclass(frozen=True) would be the natural equivalent and would provide __eq__, __repr__, and __hash__ for free. That said, the current plain-class approach is consistent with how ConcurrencyOptions and VersioningOptions are defined in this codebase, so either approach is defensible — just flagging for consideration.

Disclaimer: This review was generated by GitHub Copilot on behalf of Bernd.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants