Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 66 additions & 0 deletions docs/features.md
Original file line number Diff line number Diff line change
Expand Up @@ -166,3 +166,69 @@ with DurableTaskSchedulerWorker(host_address=endpoint, secure_channel=secure_cha

**NOTE**
The worker and client output many logs at the `DEBUG` level that will be useful when understanding orchestration flow and diagnosing issues with Durable applications. Before submitting issues, please attempt a repro of the issue with debug logging enabled.

### Work item filtering

By default a worker receives **all** work items from the backend,
regardless of which orchestrations, activities, or entities are
registered. Work item filtering lets you explicitly tell the backend
which work items a worker can handle so that only matching items are
dispatched. This is useful when running multiple specialized workers
against the same task hub.

Work item filtering is **opt-in**. Call `use_work_item_filters()` on
the worker before starting it.

#### Auto-generated filters

Calling `use_work_item_filters()` with no arguments builds filters
automatically from the worker's registry at start time:

```python
with DurableTaskSchedulerWorker(...) as w:
w.add_orchestrator(my_orchestrator)
w.add_activity(my_activity)
w.use_work_item_filters() # auto-generate from registry
w.start()
```

When versioning is configured with `VersionMatchStrategy.STRICT`,
the worker's version is included in every filter so the backend
only dispatches work items that match that exact version.

#### Explicit filters

Pass a `WorkItemFilters` instance for fine-grained control:

```python
from durabletask.worker import (
WorkItemFilters,
OrchestrationWorkItemFilter,
ActivityWorkItemFilter,
EntityWorkItemFilter,
)

w.use_work_item_filters(WorkItemFilters(
orchestrations=[
OrchestrationWorkItemFilter(name="my_orch", versions=["2.0.0"]),
],
activities=[
ActivityWorkItemFilter(name="my_activity"),
],
entities=[
EntityWorkItemFilter(name="my_entity"),
],
))
```

#### Clearing filters

Pass `None` to clear any previously configured filters and return
to the default behaviour of processing all work items:

```python
w.use_work_item_filters(None)
```

See the full
[work item filtering sample](../examples/work_item_filtering.py).
47 changes: 46 additions & 1 deletion docs/supported-patterns.md
Original file line number Diff line number Diff line change
Expand Up @@ -118,4 +118,49 @@ def my_orchestrator(ctx: task.OrchestrationContext, order: Order):
return "Success"
```

See the full [version-aware orchestrator sample](../examples/version_aware_orchestrator.py)
See the full [version-aware orchestrator sample](../examples/version_aware_orchestrator.py)

### Work item filtering

When running multiple workers against the same task hub, each
worker can declare which work items it handles. The backend then
dispatches only the matching orchestrations, activities, and
entities, avoiding unnecessary round-trips. Filtering is opt-in
and supports both auto-generated and explicit filter sets.

The simplest approach auto-generates filters from the worker's
registry:

```python
with DurableTaskSchedulerWorker(...) as w:
w.add_orchestrator(greeting_orchestrator)
w.add_activity(greet)
w.use_work_item_filters() # auto-generate from registry
w.start()
```

For more control you can provide explicit filters, including
version constraints:

```python
from durabletask.worker import (
WorkItemFilters,
OrchestrationWorkItemFilter,
ActivityWorkItemFilter,
)

w.use_work_item_filters(WorkItemFilters(
orchestrations=[
OrchestrationWorkItemFilter(
name="greeting_orchestrator",
versions=["2.0.0"],
),
],
activities=[
ActivityWorkItemFilter(name="greet"),
],
))
```

See the full
[work item filtering sample](../examples/work_item_filtering.py).
4 changes: 2 additions & 2 deletions durabletask/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@

"""Durable Task SDK for Python"""

from durabletask.worker import ConcurrencyOptions, VersioningOptions
from durabletask.worker import ConcurrencyOptions, VersioningOptions, WorkItemFilters
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider also exporting OrchestrationWorkItemFilter, ActivityWorkItemFilter, and EntityWorkItemFilter from __init__.py. Users constructing explicit filters need these types (as shown in the PR's own example and docs), and in the .NET implementation these are accessible as nested types on DurableTaskWorkerWorkItemFilters. Exporting them improves public API discoverability.

from durabletask.worker import (
    ConcurrencyOptions, VersioningOptions, WorkItemFilters,
    OrchestrationWorkItemFilter, ActivityWorkItemFilter, EntityWorkItemFilter,
)

Disclaimer: This review was generated by GitHub Copilot on behalf of Bernd.


__all__ = ["ConcurrencyOptions", "VersioningOptions"]
__all__ = ["ConcurrencyOptions", "VersioningOptions", "WorkItemFilters"]

PACKAGE_NAME = "durabletask"
125 changes: 112 additions & 13 deletions durabletask/testing/in_memory_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import durabletask.internal.orchestrator_service_pb2 as pb
import durabletask.internal.orchestrator_service_pb2_grpc as stubs
import durabletask.internal.helpers as helpers
from durabletask.entities.entity_instance_id import EntityInstanceId


@dataclass
Expand Down Expand Up @@ -56,6 +57,7 @@ class ActivityWorkItem:
task_id: int
input: Optional[str]
completion_token: int
version: Optional[str] = None


@dataclass
Expand Down Expand Up @@ -436,16 +438,65 @@ def RestartInstance(self, request: pb.RestartInstanceRequest, context):
f"Restarted instance '{request.instanceId}' as '{new_instance_id}'")
return pb.RestartInstanceResponse(instanceId=new_instance_id)

@staticmethod
def _parse_work_item_filters(request: pb.GetWorkItemsRequest):
"""Extract filters from the request.

Returns a tuple of three values, one per work-item category. Each
value is either ``None`` (no filtering -- dispatch everything) or a
``dict`` mapping a task name to a ``frozenset`` of accepted versions
(empty frozenset means *any* version of that name is accepted).
An empty ``dict`` means the worker opted into filtering for that
category but listed no names, so *nothing* should match.
"""
if not request.HasField("workItemFilters"):
return None, None, None
wf = request.workItemFilters

def _build_filter(filters):
result: dict[str, frozenset[str]] = {}
for f in filters:
versions = frozenset(f.versions) if f.versions else frozenset()
existing = result.get(f.name, frozenset())
result[f.name] = existing | versions
return result

orch_filter = _build_filter(wf.orchestrations)
activity_filter = _build_filter(wf.activities)
entity_filter = {f.name: frozenset() for f in wf.entities}
return orch_filter, activity_filter, entity_filter

@staticmethod
def _matches_filter(name: str, version: Optional[str],
filt: Optional[dict[str, frozenset[str]]]) -> bool:
"""Check whether a work item matches the parsed filter.

*filt* is ``None`` when the worker did not opt into filtering
(everything matches). Otherwise it is a dict mapping accepted
names to a frozenset of accepted versions. An empty frozenset
means any version of that name is accepted.
"""
if filt is None:
return True
accepted_versions = filt.get(name)
if accepted_versions is None:
return False
if not accepted_versions:
return True # empty set -- any version
return (version or "") in accepted_versions

def GetWorkItems(self, request: pb.GetWorkItemsRequest, context):
"""Streams work items to the worker (orchestration and activity work items)."""
self._logger.info("Worker connected and requesting work items")
orch_filter, activity_filter, entity_filter = self._parse_work_item_filters(request)

try:
while context.is_active() and not self._shutdown_event.is_set():
work_item = None

with self._lock:
# Check for orchestration work
skipped_orchs: list[str] = []
while self._orchestration_queue:
instance_id = self._orchestration_queue.popleft()
self._orchestration_queue_set.discard(instance_id)
Expand All @@ -454,11 +505,15 @@ def GetWorkItems(self, request: pb.GetWorkItemsRequest, context):
if not instance or not instance.pending_events:
continue

# Skip if orchestration doesn't match filters
if not self._matches_filter(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If EntityInstanceId.parse() fails, the entity silently bypasses the filter and gets dispatched. This seems like it should be the opposite — if we can't parse the entity ID we can't verify it matches, so it should probably be skipped (or at least a warning logged). Consider either logging a warning here or treating unparseable entity IDs as non-matching for safety.

Disclaimer: This review was generated by GitHub Copilot on behalf of Bernd.

instance.name, instance.version, orch_filter):
skipped_orchs.append(instance_id)
continue

if instance_id in self._orchestration_in_flight:
# Already being processed — re-add to queue
if instance_id not in self._orchestration_queue_set:
self._orchestration_queue.append(instance_id)
self._orchestration_queue_set.add(instance_id)
skipped_orchs.append(instance_id)
break

# Move pending events to dispatched_events
Expand All @@ -485,27 +540,62 @@ def GetWorkItems(self, request: pb.GetWorkItemsRequest, context):
)
break

# Re-queue skipped orchestrations for other workers
for s in skipped_orchs:
if s not in self._orchestration_queue_set:
self._orchestration_queue.append(s)
self._orchestration_queue_set.add(s)

# Check for activity work
if not work_item and self._activity_queue:
activity = self._activity_queue.popleft()
work_item = pb.WorkItem(
completionToken=str(activity.completion_token),
activityRequest=pb.ActivityRequest(
name=activity.name,
taskId=activity.task_id,
input=wrappers_pb2.StringValue(value=activity.input) if activity.input else None,
orchestrationInstance=pb.OrchestrationInstance(instanceId=activity.instance_id)
# Scan for the first matching activity
skipped: list = []
matched_activity = None
while self._activity_queue:
candidate = self._activity_queue.popleft()
if not self._matches_filter(
candidate.name, candidate.version,
activity_filter):
skipped.append(candidate)
continue
matched_activity = candidate
break
# Put back non-matching items
for s in skipped:
self._activity_queue.append(s)

if matched_activity is not None:
work_item = pb.WorkItem(
completionToken=str(matched_activity.completion_token),
activityRequest=pb.ActivityRequest(
name=matched_activity.name,
taskId=matched_activity.task_id,
input=wrappers_pb2.StringValue(value=matched_activity.input) if matched_activity.input else None,
orchestrationInstance=pb.OrchestrationInstance(instanceId=matched_activity.instance_id)
)
)
)

# Check for entity work
if not work_item:
skipped_entities: list[str] = []
while self._entity_queue:
entity_id = self._entity_queue.popleft()
self._entity_queue_set.discard(entity_id)
entity = self._entities.get(entity_id)

if entity and entity.pending_operations:
# Skip if entity name doesn't match filters
if entity_filter is not None:
try:
parsed = EntityInstanceId.parse(entity_id)
if not self._matches_filter(
parsed.entity, None,
entity_filter):
skipped_entities.append(entity_id)
continue
except ValueError:
pass

# Skip if this entity is already being processed
if entity_id in self._entity_in_flight:
continue
Expand All @@ -532,6 +622,12 @@ def GetWorkItems(self, request: pb.GetWorkItemsRequest, context):
)
break

# Re-queue skipped entities for other workers
for s in skipped_entities:
if s not in self._entity_queue_set:
self._entity_queue.append(s)
self._entity_queue_set.add(s)

if work_item:
yield work_item
else:
Expand Down Expand Up @@ -1259,12 +1355,15 @@ def _process_schedule_task_action(self, instance: OrchestrationInstance,
instance.status = pb.ORCHESTRATION_STATUS_RUNNING

# Queue activity for execution
task_version = schedule_task.version.value \
if schedule_task.HasField("version") else None
self._activity_queue.append(ActivityWorkItem(
instance_id=instance.instance_id,
name=task_name,
task_id=task_id,
input=input_value,
completion_token=instance.completion_token
completion_token=instance.completion_token,
version=task_version,
))
self._work_available.set()

Expand Down
Loading