From ad7450d34d3d8ae2a81579a1a39ad0996f4b86b7 Mon Sep 17 00:00:00 2001 From: Varshi Bachu Date: Thu, 26 Mar 2026 22:56:54 -0700 Subject: [PATCH 1/2] initial commit --- CHANGELOG.md | 1 + .../durabletask/DurableTaskGrpcWorker.java | 42 +++- .../DurableTaskGrpcWorkerBuilder.java | 66 +++++- .../microsoft/durabletask/WorkItemFilter.java | 205 +++++++++++++++++ .../WorkItemFilterBuilderTest.java | 170 ++++++++++++++ .../durabletask/WorkItemFilterTest.java | 207 ++++++++++++++++++ 6 files changed, 687 insertions(+), 4 deletions(-) create mode 100644 client/src/main/java/com/microsoft/durabletask/WorkItemFilter.java create mode 100644 client/src/test/java/com/microsoft/durabletask/WorkItemFilterBuilderTest.java create mode 100644 client/src/test/java/com/microsoft/durabletask/WorkItemFilterTest.java diff --git a/CHANGELOG.md b/CHANGELOG.md index ba9740bc..e45db8c2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,5 @@ ## Unreleased +* Add work item filtering support for `DurableTaskGrpcWorker` to enable worker-side filtering of orchestration and activity work items * Add support for calls to HTTP endpoints ([#271](https://github.com/microsoft/durabletask-java/pull/271)) * Add getSuspendPostUri and getResumePostUri getters to HttpManagementPayload ([#264](https://github.com/microsoft/durabletask-java/pull/264)) diff --git a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java index 284b7090..5d87489c 100644 --- a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java +++ b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java @@ -39,10 +39,11 @@ public final class DurableTaskGrpcWorker implements AutoCloseable { private final DataConverter dataConverter; private final Duration maximumTimerInterval; private final DurableTaskGrpcWorkerVersioningOptions versioningOptions; + private final WorkItemFilter workItemFilter; private final TaskHubSidecarServiceBlockingStub sidecarClient; - DurableTaskGrpcWorker(DurableTaskGrpcWorkerBuilder builder) { + DurableTaskGrpcWorker(DurableTaskGrpcWorkerBuilder builder, WorkItemFilter workItemFilter) { this.orchestrationFactories.putAll(builder.orchestrationFactories); this.activityFactories.putAll(builder.activityFactories); @@ -70,6 +71,7 @@ public final class DurableTaskGrpcWorker implements AutoCloseable { this.dataConverter = builder.dataConverter != null ? builder.dataConverter : new JacksonDataConverter(); this.maximumTimerInterval = builder.maximumTimerInterval != null ? builder.maximumTimerInterval : DEFAULT_MAXIMUM_TIMER_INTERVAL; this.versioningOptions = builder.versioningOptions; + this.workItemFilter = workItemFilter; } /** @@ -132,7 +134,7 @@ public void startAndBlock() { // TODO: How do we interrupt manually? while (true) { try { - GetWorkItemsRequest getWorkItemsRequest = GetWorkItemsRequest.newBuilder().build(); + GetWorkItemsRequest getWorkItemsRequest = buildGetWorkItemsRequest(); Iterator workItemStream = this.sidecarClient.getWorkItems(getWorkItemsRequest); while (workItemStream.hasNext()) { WorkItem workItem = workItemStream.next(); @@ -408,4 +410,38 @@ else if (requestType == RequestCase.HEALTHPING) public void stop() { this.close(); } -} \ No newline at end of file + + /** + * Returns the work item filter configured for this worker, or {@code null} if none. + */ + WorkItemFilter getWorkItemFilter() { + return this.workItemFilter; + } + + private GetWorkItemsRequest buildGetWorkItemsRequest() { + GetWorkItemsRequest.Builder builder = GetWorkItemsRequest.newBuilder(); + if (this.workItemFilter != null) { + builder.setWorkItemFilters(toProtoWorkItemFilters(this.workItemFilter)); + } + return builder.build(); + } + + static WorkItemFilters toProtoWorkItemFilters(WorkItemFilter filter) { + WorkItemFilters.Builder builder = WorkItemFilters.newBuilder(); + for (WorkItemFilter.OrchestrationFilter orch : filter.getOrchestrations()) { + com.microsoft.durabletask.implementation.protobuf.OrchestratorService.OrchestrationFilter.Builder orchBuilder = + com.microsoft.durabletask.implementation.protobuf.OrchestratorService.OrchestrationFilter.newBuilder() + .setName(orch.getName()); + orchBuilder.addAllVersions(orch.getVersions()); + builder.addOrchestrations(orchBuilder.build()); + } + for (WorkItemFilter.ActivityFilter activity : filter.getActivities()) { + com.microsoft.durabletask.implementation.protobuf.OrchestratorService.ActivityFilter.Builder actBuilder = + com.microsoft.durabletask.implementation.protobuf.OrchestratorService.ActivityFilter.newBuilder() + .setName(activity.getName()); + actBuilder.addAllVersions(activity.getVersions()); + builder.addActivities(actBuilder.build()); + } + return builder.build(); + } +} diff --git a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorkerBuilder.java b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorkerBuilder.java index ec39fee2..b764a16a 100644 --- a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorkerBuilder.java +++ b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorkerBuilder.java @@ -5,7 +5,9 @@ import io.grpc.Channel; import java.time.Duration; +import java.util.Collections; import java.util.HashMap; +import java.util.List; /** * Builder object for constructing customized {@link DurableTaskGrpcWorker} instances. @@ -18,6 +20,8 @@ public final class DurableTaskGrpcWorkerBuilder { DataConverter dataConverter; Duration maximumTimerInterval; DurableTaskGrpcWorkerVersioningOptions versioningOptions; + private WorkItemFilter workItemFilter; + private boolean autoGenerateWorkItemFilters; /** * Adds an orchestration factory to be used by the constructed {@link DurableTaskGrpcWorker}. @@ -125,11 +129,71 @@ public DurableTaskGrpcWorkerBuilder useVersioning(DurableTaskGrpcWorkerVersionin return this; } + /** + * Sets explicit work item filters for this worker. When set, only work items matching the filters + * will be dispatched to this worker by the backend. + *

+ * Work item filtering can improve efficiency in multi-worker deployments by ensuring each worker + * only receives work items it can handle. However, if an orchestration calls a task type + * (e.g., an activity or sub-orchestrator) that is not registered with any connected worker, + * the call may hang indefinitely instead of failing with an error. + * + * @param workItemFilter the work item filter to use, or {@code null} to disable filtering + * @return this builder object + */ + public DurableTaskGrpcWorkerBuilder useWorkItemFilters(WorkItemFilter workItemFilter) { + this.workItemFilter = workItemFilter; + this.autoGenerateWorkItemFilters = false; + return this; + } + + /** + * Enables automatic work item filtering by generating filters from the registered + * orchestrations and activities. When enabled, the backend will only dispatch work items + * for registered orchestrations and activities to this worker. + *

+ * Work item filtering can improve efficiency in multi-worker deployments by ensuring each worker + * only receives work items it can handle. However, if an orchestration calls a task type + * (e.g., an activity or sub-orchestrator) that is not registered with any connected worker, + * the call may hang indefinitely instead of failing with an error. + *

+ * Only use this method when all task types referenced by orchestrations are guaranteed to be + * registered with at least one connected worker. + * + * @return this builder object + */ + public DurableTaskGrpcWorkerBuilder useWorkItemFilters() { + this.autoGenerateWorkItemFilters = true; + this.workItemFilter = null; + return this; + } + /** * Initializes a new {@link DurableTaskGrpcWorker} object with the settings specified in the current builder object. * @return a new {@link DurableTaskGrpcWorker} object */ public DurableTaskGrpcWorker build() { - return new DurableTaskGrpcWorker(this); + WorkItemFilter resolvedFilter = this.autoGenerateWorkItemFilters + ? buildAutoWorkItemFilter() + : this.workItemFilter; + return new DurableTaskGrpcWorker(this, resolvedFilter); + } + + private WorkItemFilter buildAutoWorkItemFilter() { + List versions = Collections.emptyList(); + if (this.versioningOptions != null + && this.versioningOptions.getMatchStrategy() == DurableTaskGrpcWorkerVersioningOptions.VersionMatchStrategy.STRICT + && this.versioningOptions.getVersion() != null) { + versions = Collections.singletonList(this.versioningOptions.getVersion()); + } + + WorkItemFilter.Builder builder = WorkItemFilter.newBuilder(); + for (String name : this.orchestrationFactories.keySet()) { + builder.addOrchestration(name, versions); + } + for (String name : this.activityFactories.keySet()) { + builder.addActivity(name, versions); + } + return builder.build(); } } diff --git a/client/src/main/java/com/microsoft/durabletask/WorkItemFilter.java b/client/src/main/java/com/microsoft/durabletask/WorkItemFilter.java new file mode 100644 index 00000000..2a96383d --- /dev/null +++ b/client/src/main/java/com/microsoft/durabletask/WorkItemFilter.java @@ -0,0 +1,205 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package com.microsoft.durabletask; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * Represents work item filters for a Durable Task worker. These filters are passed to the backend + * and only work items matching the filters will be processed by the worker. If no filters are provided, + * the worker will process all work items. + *

+ * Work item filtering can improve efficiency in multi-worker deployments by ensuring each worker + * only receives work items it can handle. However, if an orchestration calls a task type + * (e.g., an activity or sub-orchestrator) that is not registered with any connected worker, + * the call may hang indefinitely instead of failing with an error. + *

+ * Use {@link DurableTaskGrpcWorkerBuilder#useWorkItemFilters(WorkItemFilter)} to provide explicit filters, + * or {@link DurableTaskGrpcWorkerBuilder#useWorkItemFilters()} to auto-generate filters from the + * registered orchestrations and activities. + */ +public final class WorkItemFilter { + + private final List orchestrations; + private final List activities; + + private WorkItemFilter(List orchestrations, List activities) { + this.orchestrations = Collections.unmodifiableList(new ArrayList(orchestrations)); + this.activities = Collections.unmodifiableList(new ArrayList(activities)); + } + + /** + * Gets the orchestration filters. + * + * @return an unmodifiable list of orchestration filters + */ + public List getOrchestrations() { + return this.orchestrations; + } + + /** + * Gets the activity filters. + * + * @return an unmodifiable list of activity filters + */ + public List getActivities() { + return this.activities; + } + + /** + * Creates a new {@link Builder} for constructing {@link WorkItemFilter} instances. + * + * @return a new builder + */ + public static Builder newBuilder() { + return new Builder(); + } + + /** + * Builder for constructing {@link WorkItemFilter} instances. + */ + public static final class Builder { + private final List orchestrations = new ArrayList(); + private final List activities = new ArrayList(); + + Builder() { + } + + /** + * Adds an orchestration filter with the specified name and no version constraint. + * + * @param name the orchestration name to filter on + * @return this builder + */ + public Builder addOrchestration(String name) { + if (name == null || name.isEmpty()) { + throw new IllegalArgumentException("Orchestration filter name must not be null or empty."); + } + this.orchestrations.add(new OrchestrationFilter(name, Collections.emptyList())); + return this; + } + + /** + * Adds an orchestration filter with the specified name and versions. + * + * @param name the orchestration name to filter on + * @param versions the versions to filter on + * @return this builder + */ + public Builder addOrchestration(String name, List versions) { + if (name == null || name.isEmpty()) { + throw new IllegalArgumentException("Orchestration filter name must not be null or empty."); + } + List versionsCopy = versions != null + ? Collections.unmodifiableList(new ArrayList(versions)) + : Collections.emptyList(); + this.orchestrations.add(new OrchestrationFilter(name, versionsCopy)); + return this; + } + + /** + * Adds an activity filter with the specified name and no version constraint. + * + * @param name the activity name to filter on + * @return this builder + */ + public Builder addActivity(String name) { + if (name == null || name.isEmpty()) { + throw new IllegalArgumentException("Activity filter name must not be null or empty."); + } + this.activities.add(new ActivityFilter(name, Collections.emptyList())); + return this; + } + + /** + * Adds an activity filter with the specified name and versions. + * + * @param name the activity name to filter on + * @param versions the versions to filter on + * @return this builder + */ + public Builder addActivity(String name, List versions) { + if (name == null || name.isEmpty()) { + throw new IllegalArgumentException("Activity filter name must not be null or empty."); + } + List versionsCopy = versions != null + ? Collections.unmodifiableList(new ArrayList(versions)) + : Collections.emptyList(); + this.activities.add(new ActivityFilter(name, versionsCopy)); + return this; + } + + /** + * Builds a new {@link WorkItemFilter} from the configured filters. + * + * @return a new {@link WorkItemFilter} instance + */ + public WorkItemFilter build() { + return new WorkItemFilter(this.orchestrations, this.activities); + } + } + + /** + * Specifies an orchestration filter with a name and optional versions. + */ + public static final class OrchestrationFilter { + private final String name; + private final List versions; + + OrchestrationFilter(String name, List versions) { + this.name = name; + this.versions = versions; + } + + /** + * Gets the name of the orchestration to filter. + * + * @return the orchestration name + */ + public String getName() { + return this.name; + } + + /** + * Gets the versions of the orchestration to filter. + * + * @return an unmodifiable list of versions, or an empty list if no version constraint + */ + public List getVersions() { + return this.versions; + } + } + + /** + * Specifies an activity filter with a name and optional versions. + */ + public static final class ActivityFilter { + private final String name; + private final List versions; + + ActivityFilter(String name, List versions) { + this.name = name; + this.versions = versions; + } + + /** + * Gets the name of the activity to filter. + * + * @return the activity name + */ + public String getName() { + return this.name; + } + + /** + * Gets the versions of the activity to filter. + * + * @return an unmodifiable list of versions, or an empty list if no version constraint + */ + public List getVersions() { + return this.versions; + } + } +} diff --git a/client/src/test/java/com/microsoft/durabletask/WorkItemFilterBuilderTest.java b/client/src/test/java/com/microsoft/durabletask/WorkItemFilterBuilderTest.java new file mode 100644 index 00000000..88994419 --- /dev/null +++ b/client/src/test/java/com/microsoft/durabletask/WorkItemFilterBuilderTest.java @@ -0,0 +1,170 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package com.microsoft.durabletask; + +import org.junit.jupiter.api.Test; + +import java.util.Set; +import java.util.stream.Collectors; + +import static org.junit.jupiter.api.Assertions.*; + +/** + * Unit tests for work item filter integration in {@link DurableTaskGrpcWorkerBuilder}. + */ +public class WorkItemFilterBuilderTest { + + @Test + void useWorkItemFilters_explicit_setsFilter() { + WorkItemFilter filter = WorkItemFilter.newBuilder() + .addOrchestration("Orch1") + .addActivity("Act1") + .build(); + + // Build method is private on the worker, but we can test that builder accepts the filter + // and constructs a worker without error. + DurableTaskGrpcWorkerBuilder builder = new DurableTaskGrpcWorkerBuilder(); + builder.addOrchestration(new TestOrchestrationFactory("Orch1")); + builder.addActivity(new TestActivityFactory("Act1")); + builder.useWorkItemFilters(filter); + + // Should not throw + DurableTaskGrpcWorker worker = builder.build(); + assertNotNull(worker); + worker.close(); + } + + @Test + void useWorkItemFilters_auto_generatesFromRegistered() { + DurableTaskGrpcWorkerBuilder builder = new DurableTaskGrpcWorkerBuilder(); + builder.addOrchestration(new TestOrchestrationFactory("Orch1")); + builder.addOrchestration(new TestOrchestrationFactory("Orch2")); + builder.addActivity(new TestActivityFactory("Act1")); + builder.useWorkItemFilters(); + + DurableTaskGrpcWorker worker = builder.build(); + assertNotNull(worker); + + WorkItemFilter filter = worker.getWorkItemFilter(); + assertNotNull(filter); + assertEquals(2, filter.getOrchestrations().size()); + assertEquals(1, filter.getActivities().size()); + + Set orchNames = filter.getOrchestrations().stream() + .map(WorkItemFilter.OrchestrationFilter::getName) + .collect(Collectors.toSet()); + assertTrue(orchNames.contains("Orch1")); + assertTrue(orchNames.contains("Orch2")); + + assertEquals("Act1", filter.getActivities().get(0).getName()); + worker.close(); + } + + @Test + void useWorkItemFilters_auto_withStrictVersioning_includesVersion() { + DurableTaskGrpcWorkerBuilder builder = new DurableTaskGrpcWorkerBuilder(); + builder.addOrchestration(new TestOrchestrationFactory("Orch1")); + builder.addActivity(new TestActivityFactory("Act1")); + builder.useVersioning(new DurableTaskGrpcWorkerVersioningOptions( + "1.0", "1.0", + DurableTaskGrpcWorkerVersioningOptions.VersionMatchStrategy.STRICT, + DurableTaskGrpcWorkerVersioningOptions.VersionFailureStrategy.REJECT)); + builder.useWorkItemFilters(); + + DurableTaskGrpcWorker worker = builder.build(); + assertNotNull(worker); + + WorkItemFilter filter = worker.getWorkItemFilter(); + assertNotNull(filter); + assertEquals(1, filter.getOrchestrations().size()); + assertEquals(1, filter.getOrchestrations().get(0).getVersions().size()); + assertEquals("1.0", filter.getOrchestrations().get(0).getVersions().get(0)); + + assertEquals(1, filter.getActivities().size()); + assertEquals(1, filter.getActivities().get(0).getVersions().size()); + assertEquals("1.0", filter.getActivities().get(0).getVersions().get(0)); + worker.close(); + } + + @Test + void useWorkItemFilters_auto_withNoneVersioning_noVersions() { + DurableTaskGrpcWorkerBuilder builder = new DurableTaskGrpcWorkerBuilder(); + builder.addOrchestration(new TestOrchestrationFactory("Orch1")); + builder.useVersioning(new DurableTaskGrpcWorkerVersioningOptions( + "1.0", "1.0", + DurableTaskGrpcWorkerVersioningOptions.VersionMatchStrategy.NONE, + DurableTaskGrpcWorkerVersioningOptions.VersionFailureStrategy.REJECT)); + builder.useWorkItemFilters(); + + DurableTaskGrpcWorker worker = builder.build(); + assertNotNull(worker); + + WorkItemFilter filter = worker.getWorkItemFilter(); + assertNotNull(filter); + assertTrue(filter.getOrchestrations().get(0).getVersions().isEmpty()); + worker.close(); + } + + @Test + void useWorkItemFilters_nullExplicit_disablesFiltering() { + DurableTaskGrpcWorkerBuilder builder = new DurableTaskGrpcWorkerBuilder(); + builder.addOrchestration(new TestOrchestrationFactory("Orch1")); + builder.useWorkItemFilters(null); + + DurableTaskGrpcWorker worker = builder.build(); + assertNotNull(worker); + + assertNull(worker.getWorkItemFilter()); + worker.close(); + } + + @Test + void noWorkItemFilters_filterIsNull() { + DurableTaskGrpcWorkerBuilder builder = new DurableTaskGrpcWorkerBuilder(); + builder.addOrchestration(new TestOrchestrationFactory("Orch1")); + + DurableTaskGrpcWorker worker = builder.build(); + assertNotNull(worker); + + assertNull(worker.getWorkItemFilter()); + worker.close(); + } + + // Simple test factory for orchestrations + private static class TestOrchestrationFactory implements TaskOrchestrationFactory { + private final String name; + + TestOrchestrationFactory(String name) { + this.name = name; + } + + @Override + public String getName() { + return this.name; + } + + @Override + public TaskOrchestration create() { + return ctx -> { }; + } + } + + // Simple test factory for activities + private static class TestActivityFactory implements TaskActivityFactory { + private final String name; + + TestActivityFactory(String name) { + this.name = name; + } + + @Override + public String getName() { + return this.name; + } + + @Override + public TaskActivity create() { + return ctx -> null; + } + } +} diff --git a/client/src/test/java/com/microsoft/durabletask/WorkItemFilterTest.java b/client/src/test/java/com/microsoft/durabletask/WorkItemFilterTest.java new file mode 100644 index 00000000..f2276948 --- /dev/null +++ b/client/src/test/java/com/microsoft/durabletask/WorkItemFilterTest.java @@ -0,0 +1,207 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package com.microsoft.durabletask; + +import com.microsoft.durabletask.implementation.protobuf.OrchestratorService.WorkItemFilters; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.*; + +/** + * Unit tests for {@link WorkItemFilter} and its builder. + */ +public class WorkItemFilterTest { + + @Test + void newBuilder_emptyFilter_hasEmptyLists() { + WorkItemFilter filter = WorkItemFilter.newBuilder().build(); + assertNotNull(filter.getOrchestrations()); + assertNotNull(filter.getActivities()); + assertTrue(filter.getOrchestrations().isEmpty()); + assertTrue(filter.getActivities().isEmpty()); + } + + @Test + void addOrchestration_singleName_isRetained() { + WorkItemFilter filter = WorkItemFilter.newBuilder() + .addOrchestration("MyOrchestrator") + .build(); + + assertEquals(1, filter.getOrchestrations().size()); + assertEquals("MyOrchestrator", filter.getOrchestrations().get(0).getName()); + assertTrue(filter.getOrchestrations().get(0).getVersions().isEmpty()); + } + + @Test + void addOrchestration_withVersions_retainsVersions() { + List versions = Arrays.asList("1.0", "2.0"); + WorkItemFilter filter = WorkItemFilter.newBuilder() + .addOrchestration("MyOrchestrator", versions) + .build(); + + assertEquals(1, filter.getOrchestrations().size()); + WorkItemFilter.OrchestrationFilter orch = filter.getOrchestrations().get(0); + assertEquals("MyOrchestrator", orch.getName()); + assertEquals(2, orch.getVersions().size()); + assertEquals("1.0", orch.getVersions().get(0)); + assertEquals("2.0", orch.getVersions().get(1)); + } + + @Test + void addOrchestration_nullVersions_treatedAsEmpty() { + WorkItemFilter filter = WorkItemFilter.newBuilder() + .addOrchestration("MyOrchestrator", null) + .build(); + + assertEquals(1, filter.getOrchestrations().size()); + assertTrue(filter.getOrchestrations().get(0).getVersions().isEmpty()); + } + + @Test + void addActivity_singleName_isRetained() { + WorkItemFilter filter = WorkItemFilter.newBuilder() + .addActivity("MyActivity") + .build(); + + assertEquals(1, filter.getActivities().size()); + assertEquals("MyActivity", filter.getActivities().get(0).getName()); + assertTrue(filter.getActivities().get(0).getVersions().isEmpty()); + } + + @Test + void addActivity_withVersions_retainsVersions() { + List versions = Collections.singletonList("v1"); + WorkItemFilter filter = WorkItemFilter.newBuilder() + .addActivity("MyActivity", versions) + .build(); + + assertEquals(1, filter.getActivities().size()); + WorkItemFilter.ActivityFilter act = filter.getActivities().get(0); + assertEquals("MyActivity", act.getName()); + assertEquals(1, act.getVersions().size()); + assertEquals("v1", act.getVersions().get(0)); + } + + @Test + void addActivity_nullVersions_treatedAsEmpty() { + WorkItemFilter filter = WorkItemFilter.newBuilder() + .addActivity("MyActivity", null) + .build(); + + assertEquals(1, filter.getActivities().size()); + assertTrue(filter.getActivities().get(0).getVersions().isEmpty()); + } + + @Test + void addOrchestration_nullName_throws() { + assertThrows(IllegalArgumentException.class, () -> + WorkItemFilter.newBuilder().addOrchestration(null)); + } + + @Test + void addOrchestration_emptyName_throws() { + assertThrows(IllegalArgumentException.class, () -> + WorkItemFilter.newBuilder().addOrchestration("")); + } + + @Test + void addActivity_nullName_throws() { + assertThrows(IllegalArgumentException.class, () -> + WorkItemFilter.newBuilder().addActivity(null)); + } + + @Test + void addActivity_emptyName_throws() { + assertThrows(IllegalArgumentException.class, () -> + WorkItemFilter.newBuilder().addActivity("")); + } + + @Test + void addOrchestrationWithVersions_nullName_throws() { + assertThrows(IllegalArgumentException.class, () -> + WorkItemFilter.newBuilder().addOrchestration(null, Collections.singletonList("v1"))); + } + + @Test + void addActivityWithVersions_nullName_throws() { + assertThrows(IllegalArgumentException.class, () -> + WorkItemFilter.newBuilder().addActivity(null, Collections.singletonList("v1"))); + } + + @Test + void multipleFilters_allRetained() { + WorkItemFilter filter = WorkItemFilter.newBuilder() + .addOrchestration("Orch1") + .addOrchestration("Orch2") + .addActivity("Act1") + .addActivity("Act2") + .addActivity("Act3") + .build(); + + assertEquals(2, filter.getOrchestrations().size()); + assertEquals(3, filter.getActivities().size()); + assertEquals("Orch1", filter.getOrchestrations().get(0).getName()); + assertEquals("Orch2", filter.getOrchestrations().get(1).getName()); + assertEquals("Act1", filter.getActivities().get(0).getName()); + assertEquals("Act2", filter.getActivities().get(1).getName()); + assertEquals("Act3", filter.getActivities().get(2).getName()); + } + + @Test + void orchestrationsList_isUnmodifiable() { + WorkItemFilter filter = WorkItemFilter.newBuilder() + .addOrchestration("Orch1") + .build(); + + assertThrows(UnsupportedOperationException.class, () -> + filter.getOrchestrations().add(new WorkItemFilter.OrchestrationFilter("x", Collections.emptyList()))); + } + + @Test + void activitiesList_isUnmodifiable() { + WorkItemFilter filter = WorkItemFilter.newBuilder() + .addActivity("Act1") + .build(); + + assertThrows(UnsupportedOperationException.class, () -> + filter.getActivities().add(new WorkItemFilter.ActivityFilter("x", Collections.emptyList()))); + } + + @Test + void toProtoWorkItemFilters_convertsOrchestrationsAndActivities() { + WorkItemFilter filter = WorkItemFilter.newBuilder() + .addOrchestration("Orch1", Arrays.asList("1.0", "2.0")) + .addOrchestration("Orch2") + .addActivity("Act1", Collections.singletonList("v1")) + .addActivity("Act2") + .build(); + + WorkItemFilters proto = DurableTaskGrpcWorker.toProtoWorkItemFilters(filter); + + assertEquals(2, proto.getOrchestrationsCount()); + assertEquals("Orch1", proto.getOrchestrations(0).getName()); + assertEquals(Arrays.asList("1.0", "2.0"), proto.getOrchestrations(0).getVersionsList()); + assertEquals("Orch2", proto.getOrchestrations(1).getName()); + assertTrue(proto.getOrchestrations(1).getVersionsList().isEmpty()); + + assertEquals(2, proto.getActivitiesCount()); + assertEquals("Act1", proto.getActivities(0).getName()); + assertEquals(Collections.singletonList("v1"), proto.getActivities(0).getVersionsList()); + assertEquals("Act2", proto.getActivities(1).getName()); + assertTrue(proto.getActivities(1).getVersionsList().isEmpty()); + } + + @Test + void toProtoWorkItemFilters_emptyFilter_producesEmptyProto() { + WorkItemFilter filter = WorkItemFilter.newBuilder().build(); + + WorkItemFilters proto = DurableTaskGrpcWorker.toProtoWorkItemFilters(filter); + + assertEquals(0, proto.getOrchestrationsCount()); + assertEquals(0, proto.getActivitiesCount()); + } +} From 72433f58d6c268b5799c5adcd5786e7b9d44eb78 Mon Sep 17 00:00:00 2001 From: Varshi Bachu Date: Mon, 30 Mar 2026 12:12:16 -0700 Subject: [PATCH 2/2] addressed copilot comments --- CHANGELOG.md | 2 +- .../durabletask/DurableTaskGrpcWorker.java | 5 +-- .../DurableTaskGrpcWorkerBuilder.java | 9 ++++-- .../microsoft/durabletask/WorkItemFilter.java | 32 ++++++++++++++----- 4 files changed, 35 insertions(+), 13 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e45db8c2..ac4a79e5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,5 @@ ## Unreleased -* Add work item filtering support for `DurableTaskGrpcWorker` to enable worker-side filtering of orchestration and activity work items +* Add work item filtering support for `DurableTaskGrpcWorker` to enable worker-side filtering of orchestration and activity work items ([#275](https://github.com/microsoft/durabletask-java/pull/275)) * Add support for calls to HTTP endpoints ([#271](https://github.com/microsoft/durabletask-java/pull/271)) * Add getSuspendPostUri and getResumePostUri getters to HttpManagementPayload ([#264](https://github.com/microsoft/durabletask-java/pull/264)) diff --git a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java index 5d87489c..8dcdd020 100644 --- a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java +++ b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java @@ -40,6 +40,7 @@ public final class DurableTaskGrpcWorker implements AutoCloseable { private final Duration maximumTimerInterval; private final DurableTaskGrpcWorkerVersioningOptions versioningOptions; private final WorkItemFilter workItemFilter; + private final GetWorkItemsRequest getWorkItemsRequest; private final TaskHubSidecarServiceBlockingStub sidecarClient; @@ -72,6 +73,7 @@ public final class DurableTaskGrpcWorker implements AutoCloseable { this.maximumTimerInterval = builder.maximumTimerInterval != null ? builder.maximumTimerInterval : DEFAULT_MAXIMUM_TIMER_INTERVAL; this.versioningOptions = builder.versioningOptions; this.workItemFilter = workItemFilter; + this.getWorkItemsRequest = buildGetWorkItemsRequest(); } /** @@ -134,8 +136,7 @@ public void startAndBlock() { // TODO: How do we interrupt manually? while (true) { try { - GetWorkItemsRequest getWorkItemsRequest = buildGetWorkItemsRequest(); - Iterator workItemStream = this.sidecarClient.getWorkItems(getWorkItemsRequest); + Iterator workItemStream = this.sidecarClient.getWorkItems(this.getWorkItemsRequest); while (workItemStream.hasNext()) { WorkItem workItem = workItemStream.next(); RequestCase requestType = workItem.getRequestCase(); diff --git a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorkerBuilder.java b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorkerBuilder.java index b764a16a..19d5d059 100644 --- a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorkerBuilder.java +++ b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorkerBuilder.java @@ -5,6 +5,7 @@ import io.grpc.Channel; import java.time.Duration; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -188,10 +189,14 @@ private WorkItemFilter buildAutoWorkItemFilter() { } WorkItemFilter.Builder builder = WorkItemFilter.newBuilder(); - for (String name : this.orchestrationFactories.keySet()) { + List orchestrationNames = new ArrayList<>(this.orchestrationFactories.keySet()); + Collections.sort(orchestrationNames); + for (String name : orchestrationNames) { builder.addOrchestration(name, versions); } - for (String name : this.activityFactories.keySet()) { + List activityNames = new ArrayList<>(this.activityFactories.keySet()); + Collections.sort(activityNames); + for (String name : activityNames) { builder.addActivity(name, versions); } return builder.build(); diff --git a/client/src/main/java/com/microsoft/durabletask/WorkItemFilter.java b/client/src/main/java/com/microsoft/durabletask/WorkItemFilter.java index 2a96383d..ae181eb6 100644 --- a/client/src/main/java/com/microsoft/durabletask/WorkItemFilter.java +++ b/client/src/main/java/com/microsoft/durabletask/WorkItemFilter.java @@ -92,9 +92,17 @@ public Builder addOrchestration(String name, List versions) { if (name == null || name.isEmpty()) { throw new IllegalArgumentException("Orchestration filter name must not be null or empty."); } - List versionsCopy = versions != null - ? Collections.unmodifiableList(new ArrayList(versions)) - : Collections.emptyList(); + List versionsCopy; + if (versions == null) { + versionsCopy = Collections.emptyList(); + } else { + for (String version : versions) { + if (version == null || version.isEmpty()) { + throw new IllegalArgumentException("Orchestration filter versions must not contain null or empty entries."); + } + } + versionsCopy = Collections.unmodifiableList(new ArrayList(versions)); + } this.orchestrations.add(new OrchestrationFilter(name, versionsCopy)); return this; } @@ -124,9 +132,17 @@ public Builder addActivity(String name, List versions) { if (name == null || name.isEmpty()) { throw new IllegalArgumentException("Activity filter name must not be null or empty."); } - List versionsCopy = versions != null - ? Collections.unmodifiableList(new ArrayList(versions)) - : Collections.emptyList(); + List versionsCopy; + if (versions == null) { + versionsCopy = Collections.emptyList(); + } else { + for (String version : versions) { + if (version == null || version.isEmpty()) { + throw new IllegalArgumentException("Activity filter versions must not contain null or empty entries."); + } + } + versionsCopy = Collections.unmodifiableList(new ArrayList(versions)); + } this.activities.add(new ActivityFilter(name, versionsCopy)); return this; } @@ -150,7 +166,7 @@ public static final class OrchestrationFilter { OrchestrationFilter(String name, List versions) { this.name = name; - this.versions = versions; + this.versions = Collections.unmodifiableList(new ArrayList(versions)); } /** @@ -181,7 +197,7 @@ public static final class ActivityFilter { ActivityFilter(String name, List versions) { this.name = name; - this.versions = versions; + this.versions = Collections.unmodifiableList(new ArrayList(versions)); } /**