-
Notifications
You must be signed in to change notification settings - Fork 14
Adding work item filters support #275
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -5,7 +5,9 @@ | |
| import io.grpc.Channel; | ||
|
|
||
| import java.time.Duration; | ||
| import java.util.Collections; | ||
| import java.util.HashMap; | ||
| import java.util.List; | ||
|
|
||
| /** | ||
| * Builder object for constructing customized {@link DurableTaskGrpcWorker} instances. | ||
|
|
@@ -18,6 +20,8 @@ public final class DurableTaskGrpcWorkerBuilder { | |
| DataConverter dataConverter; | ||
| Duration maximumTimerInterval; | ||
| DurableTaskGrpcWorkerVersioningOptions versioningOptions; | ||
| private WorkItemFilter workItemFilter; | ||
| private boolean autoGenerateWorkItemFilters; | ||
|
|
||
| /** | ||
| * Adds an orchestration factory to be used by the constructed {@link DurableTaskGrpcWorker}. | ||
|
|
@@ -125,11 +129,71 @@ public DurableTaskGrpcWorkerBuilder useVersioning(DurableTaskGrpcWorkerVersionin | |
| return this; | ||
| } | ||
|
|
||
| /** | ||
| * Sets explicit work item filters for this worker. When set, only work items matching the filters | ||
| * will be dispatched to this worker by the backend. | ||
| * <p> | ||
| * Work item filtering can improve efficiency in multi-worker deployments by ensuring each worker | ||
| * only receives work items it can handle. However, if an orchestration calls a task type | ||
| * (e.g., an activity or sub-orchestrator) that is not registered with any connected worker, | ||
| * the call may hang indefinitely instead of failing with an error. | ||
| * | ||
| * @param workItemFilter the work item filter to use, or {@code null} to disable filtering | ||
| * @return this builder object | ||
| */ | ||
| public DurableTaskGrpcWorkerBuilder useWorkItemFilters(WorkItemFilter workItemFilter) { | ||
| this.workItemFilter = workItemFilter; | ||
| this.autoGenerateWorkItemFilters = false; | ||
| return this; | ||
| } | ||
|
|
||
| /** | ||
| * Enables automatic work item filtering by generating filters from the registered | ||
| * orchestrations and activities. When enabled, the backend will only dispatch work items | ||
| * for registered orchestrations and activities to this worker. | ||
| * <p> | ||
| * Work item filtering can improve efficiency in multi-worker deployments by ensuring each worker | ||
| * only receives work items it can handle. However, if an orchestration calls a task type | ||
| * (e.g., an activity or sub-orchestrator) that is not registered with any connected worker, | ||
| * the call may hang indefinitely instead of failing with an error. | ||
| * <p> | ||
| * Only use this method when all task types referenced by orchestrations are guaranteed to be | ||
| * registered with at least one connected worker. | ||
| * | ||
| * @return this builder object | ||
| */ | ||
| public DurableTaskGrpcWorkerBuilder useWorkItemFilters() { | ||
| this.autoGenerateWorkItemFilters = true; | ||
| this.workItemFilter = null; | ||
| return this; | ||
| } | ||
|
|
||
| /** | ||
| * Initializes a new {@link DurableTaskGrpcWorker} object with the settings specified in the current builder object. | ||
| * @return a new {@link DurableTaskGrpcWorker} object | ||
| */ | ||
| public DurableTaskGrpcWorker build() { | ||
| return new DurableTaskGrpcWorker(this); | ||
| WorkItemFilter resolvedFilter = this.autoGenerateWorkItemFilters | ||
| ? buildAutoWorkItemFilter() | ||
| : this.workItemFilter; | ||
| return new DurableTaskGrpcWorker(this, resolvedFilter); | ||
| } | ||
|
|
||
| private WorkItemFilter buildAutoWorkItemFilter() { | ||
| List<String> versions = Collections.emptyList(); | ||
| if (this.versioningOptions != null | ||
| && this.versioningOptions.getMatchStrategy() == DurableTaskGrpcWorkerVersioningOptions.VersionMatchStrategy.STRICT | ||
| && this.versioningOptions.getVersion() != null) { | ||
| versions = Collections.singletonList(this.versioningOptions.getVersion()); | ||
| } | ||
|
|
||
| WorkItemFilter.Builder builder = WorkItemFilter.newBuilder(); | ||
| for (String name : this.orchestrationFactories.keySet()) { | ||
| builder.addOrchestration(name, versions); | ||
| } | ||
| for (String name : this.activityFactories.keySet()) { | ||
| builder.addActivity(name, versions); | ||
| } | ||
|
Comment on lines
+190
to
+196
|
||
| return builder.build(); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,205 @@ | ||||||||||||||||||||||||||||||||||
| // Copyright (c) Microsoft Corporation. All rights reserved. | ||||||||||||||||||||||||||||||||||
| // Licensed under the MIT License. | ||||||||||||||||||||||||||||||||||
| package com.microsoft.durabletask; | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| import java.util.ArrayList; | ||||||||||||||||||||||||||||||||||
| import java.util.Collections; | ||||||||||||||||||||||||||||||||||
| import java.util.List; | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| /** | ||||||||||||||||||||||||||||||||||
| * Represents work item filters for a Durable Task worker. These filters are passed to the backend | ||||||||||||||||||||||||||||||||||
| * and only work items matching the filters will be processed by the worker. If no filters are provided, | ||||||||||||||||||||||||||||||||||
| * the worker will process all work items. | ||||||||||||||||||||||||||||||||||
| * <p> | ||||||||||||||||||||||||||||||||||
| * Work item filtering can improve efficiency in multi-worker deployments by ensuring each worker | ||||||||||||||||||||||||||||||||||
| * only receives work items it can handle. However, if an orchestration calls a task type | ||||||||||||||||||||||||||||||||||
| * (e.g., an activity or sub-orchestrator) that is not registered with any connected worker, | ||||||||||||||||||||||||||||||||||
| * the call may hang indefinitely instead of failing with an error. | ||||||||||||||||||||||||||||||||||
| * <p> | ||||||||||||||||||||||||||||||||||
| * Use {@link DurableTaskGrpcWorkerBuilder#useWorkItemFilters(WorkItemFilter)} to provide explicit filters, | ||||||||||||||||||||||||||||||||||
| * or {@link DurableTaskGrpcWorkerBuilder#useWorkItemFilters()} to auto-generate filters from the | ||||||||||||||||||||||||||||||||||
| * registered orchestrations and activities. | ||||||||||||||||||||||||||||||||||
| */ | ||||||||||||||||||||||||||||||||||
| public final class WorkItemFilter { | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| private final List<OrchestrationFilter> orchestrations; | ||||||||||||||||||||||||||||||||||
| private final List<ActivityFilter> activities; | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| private WorkItemFilter(List<OrchestrationFilter> orchestrations, List<ActivityFilter> activities) { | ||||||||||||||||||||||||||||||||||
| this.orchestrations = Collections.unmodifiableList(new ArrayList<OrchestrationFilter>(orchestrations)); | ||||||||||||||||||||||||||||||||||
| this.activities = Collections.unmodifiableList(new ArrayList<ActivityFilter>(activities)); | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| /** | ||||||||||||||||||||||||||||||||||
| * Gets the orchestration filters. | ||||||||||||||||||||||||||||||||||
| * | ||||||||||||||||||||||||||||||||||
| * @return an unmodifiable list of orchestration filters | ||||||||||||||||||||||||||||||||||
| */ | ||||||||||||||||||||||||||||||||||
| public List<OrchestrationFilter> getOrchestrations() { | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| return this.orchestrations; | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| /** | ||||||||||||||||||||||||||||||||||
| * Gets the activity filters. | ||||||||||||||||||||||||||||||||||
| * | ||||||||||||||||||||||||||||||||||
| * @return an unmodifiable list of activity filters | ||||||||||||||||||||||||||||||||||
| */ | ||||||||||||||||||||||||||||||||||
| public List<ActivityFilter> getActivities() { | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| return this.activities; | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| /** | ||||||||||||||||||||||||||||||||||
| * Creates a new {@link Builder} for constructing {@link WorkItemFilter} instances. | ||||||||||||||||||||||||||||||||||
| * | ||||||||||||||||||||||||||||||||||
| * @return a new builder | ||||||||||||||||||||||||||||||||||
| */ | ||||||||||||||||||||||||||||||||||
| public static Builder newBuilder() { | ||||||||||||||||||||||||||||||||||
| return new Builder(); | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| /** | ||||||||||||||||||||||||||||||||||
| * Builder for constructing {@link WorkItemFilter} instances. | ||||||||||||||||||||||||||||||||||
| */ | ||||||||||||||||||||||||||||||||||
| public static final class Builder { | ||||||||||||||||||||||||||||||||||
| private final List<OrchestrationFilter> orchestrations = new ArrayList<OrchestrationFilter>(); | ||||||||||||||||||||||||||||||||||
| private final List<ActivityFilter> activities = new ArrayList<ActivityFilter>(); | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| Builder() { | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| /** | ||||||||||||||||||||||||||||||||||
| * Adds an orchestration filter with the specified name and no version constraint. | ||||||||||||||||||||||||||||||||||
| * | ||||||||||||||||||||||||||||||||||
| * @param name the orchestration name to filter on | ||||||||||||||||||||||||||||||||||
| * @return this builder | ||||||||||||||||||||||||||||||||||
| */ | ||||||||||||||||||||||||||||||||||
| public Builder addOrchestration(String name) { | ||||||||||||||||||||||||||||||||||
| if (name == null || name.isEmpty()) { | ||||||||||||||||||||||||||||||||||
| throw new IllegalArgumentException("Orchestration filter name must not be null or empty."); | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
| this.orchestrations.add(new OrchestrationFilter(name, Collections.<String>emptyList())); | ||||||||||||||||||||||||||||||||||
| return this; | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| /** | ||||||||||||||||||||||||||||||||||
| * Adds an orchestration filter with the specified name and versions. | ||||||||||||||||||||||||||||||||||
| * | ||||||||||||||||||||||||||||||||||
| * @param name the orchestration name to filter on | ||||||||||||||||||||||||||||||||||
| * @param versions the versions to filter on | ||||||||||||||||||||||||||||||||||
| * @return this builder | ||||||||||||||||||||||||||||||||||
| */ | ||||||||||||||||||||||||||||||||||
| public Builder addOrchestration(String name, List<String> versions) { | ||||||||||||||||||||||||||||||||||
| if (name == null || name.isEmpty()) { | ||||||||||||||||||||||||||||||||||
| throw new IllegalArgumentException("Orchestration filter name must not be null or empty."); | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
| List<String> versionsCopy = versions != null | ||||||||||||||||||||||||||||||||||
| ? Collections.unmodifiableList(new ArrayList<String>(versions)) | ||||||||||||||||||||||||||||||||||
| : Collections.<String>emptyList(); | ||||||||||||||||||||||||||||||||||
|
Comment on lines
+95
to
+97
|
||||||||||||||||||||||||||||||||||
| List<String> versionsCopy = versions != null | |
| ? Collections.unmodifiableList(new ArrayList<String>(versions)) | |
| : Collections.<String>emptyList(); | |
| List<String> versionsCopy; | |
| if (versions == null) { | |
| versionsCopy = Collections.<String>emptyList(); | |
| } else { | |
| List<String> cleanedVersions = new ArrayList<String>(versions.size()); | |
| for (String version : versions) { | |
| if (version == null || version.isEmpty()) { | |
| throw new IllegalArgumentException("Orchestration filter versions must not contain null or empty entries."); | |
| } | |
| cleanedVersions.add(version); | |
| } | |
| versionsCopy = Collections.unmodifiableList(cleanedVersions); | |
| } |
Copilot
AI
Mar 27, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
addActivity(name, versions) doesn't validate the contents of the versions list. If callers pass a list containing null elements, proto conversion via addAllVersions(...) will throw at runtime. Add input validation to reject null entries (and optionally empty strings) before storing the filter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
buildGetWorkItemsRequest()rebuilds and re-serializes theWorkItemFiltersproto every time the listen loop restarts (and could do so repeatedly on transient disconnects). SinceworkItemFilteris immutable for the lifetime of the worker, consider precomputing and storing the protoWorkItemFiltersonce (e.g., in the constructor) to reduce allocations and CPU in retry loops.