From 121233764e09fc66baadc307e3627f8ca44d36c6 Mon Sep 17 00:00:00 2001 From: nvasiu Date: Mon, 25 May 2026 22:43:45 +0000 Subject: [PATCH] feat(sdk): Add subtypes for Step, Wait, Callback and Invoke operations Added new OperationSubType values for all operation types that were missing them (step, wait, callback and invoke). And pass these values when creating operations in DurableContextImpl. ChildContextOperation explicitly checks for unreachable subtypes. Updated ChildContextOperation to check for the new subtype values. Updated the tests for the above operations that check the subtype to use these new subtype values. Updated LocalMemoryExecutionClient.applyResult to preserve subtype when updating an operation. Previously the subtype value was not passed on to the updated operation, causing test failures. --- .../local/LocalMemoryExecutionClient.java | 1 + .../durable/context/DurableContextImpl.java | 26 +++++++------- .../durable/model/OperationIdentifier.java | 21 +++++------ .../durable/model/OperationSubType.java | 35 +++++++++++++------ .../operation/BaseDurableOperation.java | 14 ++++---- .../operation/ChildContextOperation.java | 3 +- .../operation/ConcurrencyOperation.java | 3 +- .../operation/WaitForConditionOperation.java | 11 ++---- .../lambda/durable/ReplayValidationTest.java | 4 +++ .../execution/DurableExecutionTest.java | 2 ++ .../operation/CallbackOperationTest.java | 11 +++++- .../operation/ChildContextOperationTest.java | 2 +- .../operation/ConcurrencyOperationTest.java | 3 +- .../operation/InvokeOperationTest.java | 4 +-- .../operation/ParallelOperationTest.java | 2 +- .../SerializableDurableOperationTest.java | 4 ++- .../durable/operation/StepOperationTest.java | 4 +-- .../WaitForConditionOperationTest.java | 8 ++++- .../durable/operation/WaitOperationTest.java | 4 +-- 19 files changed, 93 insertions(+), 69 deletions(-) diff --git a/sdk-testing/src/main/java/software/amazon/lambda/durable/testing/local/LocalMemoryExecutionClient.java b/sdk-testing/src/main/java/software/amazon/lambda/durable/testing/local/LocalMemoryExecutionClient.java index 847872f46..90ee28d99 100644 --- a/sdk-testing/src/main/java/software/amazon/lambda/durable/testing/local/LocalMemoryExecutionClient.java +++ b/sdk-testing/src/main/java/software/amazon/lambda/durable/testing/local/LocalMemoryExecutionClient.java @@ -190,6 +190,7 @@ private void applyResult(Operation op, OperationResult result) { .id(op.id()) .name(op.name()) .type(op.type()) + .subType(op.subType()) .action(action) .parentId(op.parentId()) .payload(result.result()) diff --git a/sdk/src/main/java/software/amazon/lambda/durable/context/DurableContextImpl.java b/sdk/src/main/java/software/amazon/lambda/durable/context/DurableContextImpl.java index 62414aba9..c08b6317b 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/context/DurableContextImpl.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/context/DurableContextImpl.java @@ -11,7 +11,6 @@ import java.util.function.BiFunction; import java.util.function.Function; import org.slf4j.LoggerFactory; -import software.amazon.awssdk.services.lambda.model.OperationType; import software.amazon.lambda.durable.DurableCallbackFuture; import software.amazon.lambda.durable.DurableConfig; import software.amazon.lambda.durable.DurableContext; @@ -146,7 +145,7 @@ public DurableFuture stepAsync( // Create and start step operation with TypeToken var operation = new StepOperation<>( - OperationIdentifier.of(operationId, name, OperationType.STEP), func, resultType, config, this); + OperationIdentifier.of(operationId, name, OperationSubType.STEP), func, resultType, config, this); operation.execute(); // Start the step (returns immediately) @@ -162,7 +161,7 @@ public DurableFuture waitAsync(String name, Duration duration) { // Create and start wait operation var operation = - new WaitOperation(OperationIdentifier.of(operationId, name, OperationType.WAIT), duration, this); + new WaitOperation(OperationIdentifier.of(operationId, name, OperationSubType.WAIT), duration, this); operation.execute(); // Checkpoint the wait return operation; @@ -187,7 +186,7 @@ public DurableFuture invokeAsync( // Create and start invoke operation var operation = new InvokeOperation<>( - OperationIdentifier.of(operationId, name, OperationType.CHAINED_INVOKE), + OperationIdentifier.of(operationId, name, OperationSubType.CHAINED_INVOKE), functionName, payload, resultType, @@ -207,7 +206,7 @@ public DurableCallbackFuture createCallback(String name, TypeToken res var operationId = nextOperationId(); var operation = new CallbackOperation<>( - OperationIdentifier.of(operationId, name, OperationType.CALLBACK), resultType, config, this); + OperationIdentifier.of(operationId, name, OperationSubType.CALLBACK), resultType, config, this); operation.execute(); return operation; @@ -248,11 +247,7 @@ private DurableFuture runInChildContextAsync( var operationId = nextOperationId(); var operation = new ChildContextOperation<>( - OperationIdentifier.of(operationId, name, OperationType.CONTEXT, subType), - func, - resultType, - config, - this); + OperationIdentifier.of(operationId, name, subType), func, resultType, config, this); operation.execute(); return operation; @@ -277,7 +272,7 @@ public DurableFuture> mapAsync( var operationId = nextOperationId(); var operation = new MapOperation<>( - OperationIdentifier.of(operationId, name, OperationType.CONTEXT, OperationSubType.MAP), + OperationIdentifier.of(operationId, name, OperationSubType.MAP), itemList, function, resultType, @@ -293,7 +288,7 @@ public ParallelDurableFuture parallel(String name, ParallelConfig config) { var operationId = nextOperationId(); var parallelOp = new ParallelOperation( - OperationIdentifier.of(operationId, name, OperationType.CONTEXT, OperationSubType.PARALLEL), + OperationIdentifier.of(operationId, name, OperationSubType.PARALLEL), getDurableConfig().getSerDes(), this, config); @@ -363,7 +358,12 @@ public DurableFuture waitForConditionAsync( } var operationId = nextOperationId(); - var operation = new WaitForConditionOperation<>(operationId, name, checkFunc, resultType, config, this); + var operation = new WaitForConditionOperation<>( + OperationIdentifier.of(operationId, name, OperationSubType.WAIT_FOR_CONDITION), + checkFunc, + resultType, + config, + this); operation.execute(); diff --git a/sdk/src/main/java/software/amazon/lambda/durable/model/OperationIdentifier.java b/sdk/src/main/java/software/amazon/lambda/durable/model/OperationIdentifier.java index 17fd4771a..9f986f4ce 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/model/OperationIdentifier.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/model/OperationIdentifier.java @@ -5,24 +5,21 @@ import software.amazon.awssdk.services.lambda.model.OperationType; /** - * Identifies a durable operation by its unique ID, human-readable name, type, and optional sub-type. + * Identifies a durable operation by its unique ID, human-readable name, type and sub-type. * * @param operationId unique sequential identifier for the operation within an execution * @param name human-readable name for the operation - * @param operationType the kind of operation (STEP, WAIT, CALLBACK, etc.) - * @param subType optional sub-type for operations that need further classification (e.g. child contexts) + * @param subType the operation sub-type which also determines the operation type */ -public record OperationIdentifier( - String operationId, String name, OperationType operationType, OperationSubType subType) { +public record OperationIdentifier(String operationId, String name, OperationSubType subType) { - /** Creates an identifier without a sub-type. */ - public static OperationIdentifier of(String operationId, String name, OperationType type) { - return new OperationIdentifier(operationId, name, type, null); + /** Returns the operation type derived from the sub-type. */ + public OperationType operationType() { + return subType.getOperationType(); } - /** Creates an identifier with a sub-type. */ - public static OperationIdentifier of( - String operationId, String name, OperationType type, OperationSubType subType) { - return new OperationIdentifier(operationId, name, type, subType); + /** Creates an identifier with the given sub-type. */ + public static OperationIdentifier of(String operationId, String name, OperationSubType subType) { + return new OperationIdentifier(operationId, name, subType); } } diff --git a/sdk/src/main/java/software/amazon/lambda/durable/model/OperationSubType.java b/sdk/src/main/java/software/amazon/lambda/durable/model/OperationSubType.java index 416798f9f..c7273b9cd 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/model/OperationSubType.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/model/OperationSubType.java @@ -2,28 +2,41 @@ // SPDX-License-Identifier: Apache-2.0 package software.amazon.lambda.durable.model; +import software.amazon.awssdk.services.lambda.model.OperationType; + /** - * Fine-grained classification of durable operations beyond the basic operation types. + * Fine-grained classification of durable operations that pairs each subtype with its parent {@link OperationType}. * - *

Used as the {@code subType} field in checkpoint updates for {@code CONTEXT} operations. Matches the + *

Used as the source of both the {@code type} and {@code subType} fields in checkpoint updates. Matches the * {@code OperationSubType} enum in the JavaScript and Python durable execution SDKs. */ public enum OperationSubType { - RUN_IN_CHILD_CONTEXT("RunInChildContext"), - MAP("Map"), - MAP_ITERATION("MapIteration"), - PARALLEL("Parallel"), - PARALLEL_BRANCH("ParallelBranch"), - WAIT_FOR_CALLBACK("WaitForCallback"), - WAIT_FOR_CONDITION("WaitForCondition"), - WITH_RETRY("WithRetry"); + STEP(OperationType.STEP, "Step"), + WAIT(OperationType.WAIT, "Wait"), + CALLBACK(OperationType.CALLBACK, "Callback"), + CHAINED_INVOKE(OperationType.CHAINED_INVOKE, "ChainedInvoke"), + RUN_IN_CHILD_CONTEXT(OperationType.CONTEXT, "RunInChildContext"), + MAP(OperationType.CONTEXT, "Map"), + MAP_ITERATION(OperationType.CONTEXT, "MapIteration"), + PARALLEL(OperationType.CONTEXT, "Parallel"), + PARALLEL_BRANCH(OperationType.CONTEXT, "ParallelBranch"), + WAIT_FOR_CALLBACK(OperationType.CONTEXT, "WaitForCallback"), + WAIT_FOR_CONDITION(OperationType.STEP, "WaitForCondition"), + WITH_RETRY(OperationType.CONTEXT, "WithRetry"); + private final OperationType operationType; private final String value; - OperationSubType(String value) { + OperationSubType(OperationType operationType, String value) { + this.operationType = operationType; this.value = value; } + /** Returns the parent {@link OperationType} for this subtype. */ + public OperationType getOperationType() { + return operationType; + } + /** Returns the wire-format string value sent in checkpoint updates. */ public String getValue() { return value; diff --git a/sdk/src/main/java/software/amazon/lambda/durable/operation/BaseDurableOperation.java b/sdk/src/main/java/software/amazon/lambda/durable/operation/BaseDurableOperation.java index 7a88173b0..b3dd55b95 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/operation/BaseDurableOperation.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/operation/BaseDurableOperation.java @@ -410,11 +410,11 @@ protected void sendOperationUpdate(OperationUpdate.Builder builder) { /** Sends an operation update asynchronously. */ protected CompletableFuture sendOperationUpdateAsync(OperationUpdate.Builder builder) { - var updateBuilder = - builder.id(getOperationId()).name(getName()).type(getType()).parentId(durableContext.getParentId()); - if (getSubType() != null) { - updateBuilder.subType(getSubType().getValue()); - } + var updateBuilder = builder.id(getOperationId()) + .name(getName()) + .type(getType()) + .subType(getSubType().getValue()) + .parentId(durableContext.getParentId()); var update = updateBuilder.build(); if (replayCompletedOperation.get()) { // We are replaying a completed operation, so complete the completableFuture without checkpointing @@ -444,9 +444,7 @@ protected void validateReplay(Operation checkpointed) { getOperationId(), checkpointed.name(), getName()))); } - if ((getSubType() == null && checkpointed.subType() != null) - || getSubType() != null - && !Objects.equals(checkpointed.subType(), getSubType().getValue())) { + if (!Objects.equals(checkpointed.subType(), getSubType().getValue())) { throw terminateExecution(new NonDeterministicExecutionException(String.format( "Operation subType mismatch for \"%s\". Expected \"%s\", got \"%s\"", getOperationId(), checkpointed.subType(), getSubType()))); diff --git a/sdk/src/main/java/software/amazon/lambda/durable/operation/ChildContextOperation.java b/sdk/src/main/java/software/amazon/lambda/durable/operation/ChildContextOperation.java index 1aba0f72b..7b47228be 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/operation/ChildContextOperation.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/operation/ChildContextOperation.java @@ -248,7 +248,8 @@ private Throwable translateException(Operation op, ErrorObject errorObject) { case RUN_IN_CHILD_CONTEXT, WITH_RETRY -> new ChildContextFailedException(op); // the following subtypes should not be able to reach here - case PARALLEL, MAP, WAIT_FOR_CONDITION -> new IllegalStateException("Unexpected sub-type: " + getSubType()); + case PARALLEL, MAP, WAIT_FOR_CONDITION, STEP, WAIT, CALLBACK, CHAINED_INVOKE -> + new IllegalStateException("Unexpected sub-type: " + getSubType()); }; } diff --git a/sdk/src/main/java/software/amazon/lambda/durable/operation/ConcurrencyOperation.java b/sdk/src/main/java/software/amazon/lambda/durable/operation/ConcurrencyOperation.java index 4c1e4c2d1..6fee06960 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/operation/ConcurrencyOperation.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/operation/ConcurrencyOperation.java @@ -16,7 +16,6 @@ import java.util.function.Function; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import software.amazon.awssdk.services.lambda.model.OperationType; import software.amazon.lambda.durable.DurableContext; import software.amazon.lambda.durable.TypeToken; import software.amazon.lambda.durable.config.NestingType; @@ -116,7 +115,7 @@ protected ChildContextOperation createItem( SerDes serDes, OperationSubType branchSubType) { return new ChildContextOperation<>( - OperationIdentifier.of(operationId, name, OperationType.CONTEXT, branchSubType), + OperationIdentifier.of(operationId, name, branchSubType), function, resultType, RunInChildContextConfig.builder() diff --git a/sdk/src/main/java/software/amazon/lambda/durable/operation/WaitForConditionOperation.java b/sdk/src/main/java/software/amazon/lambda/durable/operation/WaitForConditionOperation.java index fa8bb7ffd..5ca9d7d66 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/operation/WaitForConditionOperation.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/operation/WaitForConditionOperation.java @@ -8,7 +8,6 @@ import software.amazon.awssdk.services.lambda.model.Operation; import software.amazon.awssdk.services.lambda.model.OperationAction; import software.amazon.awssdk.services.lambda.model.OperationStatus; -import software.amazon.awssdk.services.lambda.model.OperationType; import software.amazon.awssdk.services.lambda.model.OperationUpdate; import software.amazon.awssdk.services.lambda.model.StepOptions; import software.amazon.lambda.durable.StepContext; @@ -21,7 +20,6 @@ import software.amazon.lambda.durable.execution.SuspendExecutionException; import software.amazon.lambda.durable.execution.ThreadType; import software.amazon.lambda.durable.model.OperationIdentifier; -import software.amazon.lambda.durable.model.OperationSubType; import software.amazon.lambda.durable.model.WaitForConditionResult; import software.amazon.lambda.durable.util.ExceptionHelper; @@ -41,17 +39,12 @@ public class WaitForConditionOperation extends SerializableDurableOperation config; public WaitForConditionOperation( - String operationId, - String name, + OperationIdentifier operationIdentifier, BiFunction> checkFunc, TypeToken resultTypeToken, WaitForConditionConfig config, DurableContextImpl durableContext) { - super( - OperationIdentifier.of(operationId, name, OperationType.STEP, OperationSubType.WAIT_FOR_CONDITION), - resultTypeToken, - config.serDes(), - durableContext); + super(operationIdentifier, resultTypeToken, config.serDes(), durableContext); this.checkFunc = checkFunc; this.config = config; diff --git a/sdk/src/test/java/software/amazon/lambda/durable/ReplayValidationTest.java b/sdk/src/test/java/software/amazon/lambda/durable/ReplayValidationTest.java index e8e75b1df..cdbe0711b 100644 --- a/sdk/src/test/java/software/amazon/lambda/durable/ReplayValidationTest.java +++ b/sdk/src/test/java/software/amazon/lambda/durable/ReplayValidationTest.java @@ -21,6 +21,7 @@ import software.amazon.lambda.durable.execution.ThreadContext; import software.amazon.lambda.durable.execution.ThreadType; import software.amazon.lambda.durable.model.DurableExecutionInput; +import software.amazon.lambda.durable.model.OperationSubType; class ReplayValidationTest { private static final String EXECUTION_NAME = "exec-name"; @@ -67,6 +68,7 @@ void shouldPassValidationWhenStepTypeAndNameMatch() { .id(OPERATION_ID1) .name("test") .type(OperationType.STEP) + .subType(OperationSubType.STEP.getValue()) .status(OperationStatus.SUCCEEDED) .stepDetails(StepDetails.builder().result("\"result\"").build()) .build(); @@ -83,6 +85,7 @@ void shouldPassValidationWhenWaitTypeMatches() { var existingOp = Operation.builder() .id(OPERATION_ID1) .type(OperationType.WAIT) + .subType(OperationSubType.WAIT.getValue()) .status(OperationStatus.SUCCEEDED) .build(); @@ -144,6 +147,7 @@ void shouldHandleNullNamesCorrectly() { .id(OPERATION_ID1) .name(null) .type(OperationType.STEP) + .subType(OperationSubType.STEP.getValue()) .status(OperationStatus.SUCCEEDED) .stepDetails(StepDetails.builder().result("\"result\"").build()) .build(); diff --git a/sdk/src/test/java/software/amazon/lambda/durable/execution/DurableExecutionTest.java b/sdk/src/test/java/software/amazon/lambda/durable/execution/DurableExecutionTest.java index 6c0668f8b..5b7ccbb6f 100644 --- a/sdk/src/test/java/software/amazon/lambda/durable/execution/DurableExecutionTest.java +++ b/sdk/src/test/java/software/amazon/lambda/durable/execution/DurableExecutionTest.java @@ -25,6 +25,7 @@ import software.amazon.lambda.durable.exception.UnrecoverableDurableExecutionException; import software.amazon.lambda.durable.model.DurableExecutionInput; import software.amazon.lambda.durable.model.ExecutionStatus; +import software.amazon.lambda.durable.model.OperationSubType; class DurableExecutionTest { @@ -187,6 +188,7 @@ void testExecuteReplay() { .id(OPERATION_ID1) .name("step1") .type(OperationType.STEP) + .subType(OperationSubType.STEP.getValue()) .status(OperationStatus.SUCCEEDED) .stepDetails(StepDetails.builder().result("\"First\"").build()) .build(); diff --git a/sdk/src/test/java/software/amazon/lambda/durable/operation/CallbackOperationTest.java b/sdk/src/test/java/software/amazon/lambda/durable/operation/CallbackOperationTest.java index ebe2d7e99..f7e5de707 100644 --- a/sdk/src/test/java/software/amazon/lambda/durable/operation/CallbackOperationTest.java +++ b/sdk/src/test/java/software/amazon/lambda/durable/operation/CallbackOperationTest.java @@ -26,6 +26,7 @@ import software.amazon.lambda.durable.execution.ThreadType; import software.amazon.lambda.durable.model.DurableExecutionInput; import software.amazon.lambda.durable.model.OperationIdentifier; +import software.amazon.lambda.durable.model.OperationSubType; import software.amazon.lambda.durable.serde.JacksonSerDes; import software.amazon.lambda.durable.serde.SerDes; @@ -34,7 +35,7 @@ class CallbackOperationTest { private static final String OPERATION_ID = TestUtils.hashOperationId("1"); private static final String OPERATION_NAME = "approval"; private static final OperationIdentifier OPERATION_IDENTIFIER = - OperationIdentifier.of(OPERATION_ID, OPERATION_NAME, OperationType.CALLBACK); + OperationIdentifier.of(OPERATION_ID, OPERATION_NAME, OperationSubType.CALLBACK); private static final String EXECUTION_NAME = "exec-name"; private static final String EXECUTION_OP_ID = "123"; private static final String EXECUTION_ARN = "arn:aws:lambda:us-east-1:123456789012:function:test/durable-execution/" @@ -139,6 +140,7 @@ void replayReturnsExistingCallbackIdWhenSucceeded() { .id(OPERATION_ID) .name(OPERATION_NAME) .type(OperationType.CALLBACK) + .subType(OperationSubType.CALLBACK.getValue()) .status(OperationStatus.SUCCEEDED) .callbackDetails(CallbackDetails.builder() .callbackId("existing-callback-id") @@ -165,6 +167,7 @@ void getReturnsDeserializedResultWhenSucceeded() { .id(OPERATION_ID) .name(OPERATION_NAME) .type(OperationType.CALLBACK) + .subType(OperationSubType.CALLBACK.getValue()) .status(OperationStatus.SUCCEEDED) .callbackDetails(CallbackDetails.builder() .callbackId("callback-id") @@ -193,6 +196,7 @@ void getThrowsCallbackExceptionWhenFailed() { .id(OPERATION_ID) .name(OPERATION_NAME) .type(OperationType.CALLBACK) + .subType(OperationSubType.CALLBACK.getValue()) .status(OperationStatus.FAILED) .callbackDetails(CallbackDetails.builder() .callbackId("callback-id") @@ -224,6 +228,7 @@ void getThrowsCallbackTimeoutExceptionWhenTimedOut() { .id(OPERATION_ID) .name(OPERATION_NAME) .type(OperationType.CALLBACK) + .subType(OperationSubType.CALLBACK.getValue()) .status(OperationStatus.TIMED_OUT) .callbackDetails( CallbackDetails.builder().callbackId("callback-id").build()) @@ -252,6 +257,7 @@ void operationUsesCustomSerDesWhenConfigContainsOne() { .id(OPERATION_ID) .name(OPERATION_NAME) .type(OperationType.CALLBACK) + .subType(OperationSubType.CALLBACK.getValue()) .status(OperationStatus.SUCCEEDED) .callbackDetails(CallbackDetails.builder() .callbackId("callback-id") @@ -280,6 +286,7 @@ void operationUsesDefaultSerDesWhenConfigIsNull() { .id(OPERATION_ID) .name(OPERATION_NAME) .type(OperationType.CALLBACK) + .subType(OperationSubType.CALLBACK.getValue()) .status(OperationStatus.SUCCEEDED) .callbackDetails(CallbackDetails.builder() .callbackId("callback-id") @@ -310,6 +317,7 @@ void operationUsesDefaultSerDesWhenConfigSerDesIsNull() { .id(OPERATION_ID) .name(OPERATION_NAME) .type(OperationType.CALLBACK) + .subType(OperationSubType.CALLBACK.getValue()) .status(OperationStatus.SUCCEEDED) .callbackDetails(CallbackDetails.builder() .callbackId("callback-id") @@ -338,6 +346,7 @@ void getThrowsSerDesExceptionWithHelpfulMessageWhenDeserializationFails() { .id(OPERATION_ID) .name(OPERATION_NAME) .type(OperationType.CALLBACK) + .subType(OperationSubType.CALLBACK.getValue()) .status(OperationStatus.SUCCEEDED) .callbackDetails(CallbackDetails.builder() .callbackId("test-callback-123") diff --git a/sdk/src/test/java/software/amazon/lambda/durable/operation/ChildContextOperationTest.java b/sdk/src/test/java/software/amazon/lambda/durable/operation/ChildContextOperationTest.java index 9a71e46a3..ac56262a4 100644 --- a/sdk/src/test/java/software/amazon/lambda/durable/operation/ChildContextOperationTest.java +++ b/sdk/src/test/java/software/amazon/lambda/durable/operation/ChildContextOperationTest.java @@ -55,7 +55,7 @@ private DurableConfig createConfig() { } private static final OperationIdentifier OPERATION_IDENTIFIER = - OperationIdentifier.of("1", "test-context", OperationType.CONTEXT, OperationSubType.RUN_IN_CHILD_CONTEXT); + OperationIdentifier.of("1", "test-context", OperationSubType.RUN_IN_CHILD_CONTEXT); private ChildContextOperation createOperation(Function func) { return new ChildContextOperation<>( diff --git a/sdk/src/test/java/software/amazon/lambda/durable/operation/ConcurrencyOperationTest.java b/sdk/src/test/java/software/amazon/lambda/durable/operation/ConcurrencyOperationTest.java index d29c3e842..0bc2ea51a 100644 --- a/sdk/src/test/java/software/amazon/lambda/durable/operation/ConcurrencyOperationTest.java +++ b/sdk/src/test/java/software/amazon/lambda/durable/operation/ConcurrencyOperationTest.java @@ -85,8 +85,7 @@ void setUp() { private TestConcurrencyOperation createOperation(CompletionConfig completionConfig) throws Exception { return new TestConcurrencyOperation( - OperationIdentifier.of( - OPERATION_ID, "test-concurrency", OperationType.CONTEXT, OperationSubType.PARALLEL), + OperationIdentifier.of(OPERATION_ID, "test-concurrency", OperationSubType.PARALLEL), RESULT_TYPE, SER_DES, durableContext, diff --git a/sdk/src/test/java/software/amazon/lambda/durable/operation/InvokeOperationTest.java b/sdk/src/test/java/software/amazon/lambda/durable/operation/InvokeOperationTest.java index daa75056c..2c1d76c74 100644 --- a/sdk/src/test/java/software/amazon/lambda/durable/operation/InvokeOperationTest.java +++ b/sdk/src/test/java/software/amazon/lambda/durable/operation/InvokeOperationTest.java @@ -13,7 +13,6 @@ import software.amazon.awssdk.services.lambda.model.ErrorObject; import software.amazon.awssdk.services.lambda.model.Operation; import software.amazon.awssdk.services.lambda.model.OperationStatus; -import software.amazon.awssdk.services.lambda.model.OperationType; import software.amazon.lambda.durable.TypeToken; import software.amazon.lambda.durable.config.InvokeConfig; import software.amazon.lambda.durable.context.DurableContextImpl; @@ -25,13 +24,14 @@ import software.amazon.lambda.durable.execution.ThreadContext; import software.amazon.lambda.durable.execution.ThreadType; import software.amazon.lambda.durable.model.OperationIdentifier; +import software.amazon.lambda.durable.model.OperationSubType; import software.amazon.lambda.durable.serde.JacksonSerDes; class InvokeOperationTest { private static final String OPERATION_ID = "2"; private static final String OPERATION_NAME = "test-invoke"; private static final OperationIdentifier OPERATION_IDENTIFIER = - OperationIdentifier.of(OPERATION_ID, OPERATION_NAME, OperationType.CHAINED_INVOKE); + OperationIdentifier.of(OPERATION_ID, OPERATION_NAME, OperationSubType.CHAINED_INVOKE); private ExecutionManager executionManager; private DurableContextImpl durableContext; diff --git a/sdk/src/test/java/software/amazon/lambda/durable/operation/ParallelOperationTest.java b/sdk/src/test/java/software/amazon/lambda/durable/operation/ParallelOperationTest.java index 32a41f2cd..e64af1870 100644 --- a/sdk/src/test/java/software/amazon/lambda/durable/operation/ParallelOperationTest.java +++ b/sdk/src/test/java/software/amazon/lambda/durable/operation/ParallelOperationTest.java @@ -115,7 +115,7 @@ void setUp() { private ParallelOperation createOperation(CompletionConfig completionConfig) { var op = new ParallelOperation( - OperationIdentifier.of(OPERATION_ID, "test-parallel", OperationType.CONTEXT, OperationSubType.PARALLEL), + OperationIdentifier.of(OPERATION_ID, "test-parallel", OperationSubType.PARALLEL), SER_DES, durableContext, ParallelConfig.builder().completionConfig(completionConfig).build()); diff --git a/sdk/src/test/java/software/amazon/lambda/durable/operation/SerializableDurableOperationTest.java b/sdk/src/test/java/software/amazon/lambda/durable/operation/SerializableDurableOperationTest.java index 6ae8cc890..195636fd8 100644 --- a/sdk/src/test/java/software/amazon/lambda/durable/operation/SerializableDurableOperationTest.java +++ b/sdk/src/test/java/software/amazon/lambda/durable/operation/SerializableDurableOperationTest.java @@ -36,6 +36,7 @@ import software.amazon.lambda.durable.execution.ThreadContext; import software.amazon.lambda.durable.execution.ThreadType; import software.amazon.lambda.durable.model.OperationIdentifier; +import software.amazon.lambda.durable.model.OperationSubType; import software.amazon.lambda.durable.serde.JacksonSerDes; import software.amazon.lambda.durable.serde.SerDes; @@ -47,7 +48,7 @@ class SerializableDurableOperationTest { private static final Operation OPERATION = Operation.builder().build(); private static final OperationType OPERATION_TYPE = OperationType.STEP; private static final OperationIdentifier OPERATION_IDENTIFIER = - OperationIdentifier.of(OPERATION_ID, OPERATION_NAME, OPERATION_TYPE); + OperationIdentifier.of(OPERATION_ID, OPERATION_NAME, OperationSubType.STEP); private static final TypeToken RESULT_TYPE = TypeToken.get(String.class); private static final SerDes SER_DES = new JacksonSerDes(); private static final String RESULT = "name"; @@ -272,6 +273,7 @@ void validateReplayDoesNotThrowWhenNameAndTypeMatch() { .thenReturn(Operation.builder() .name(OPERATION_NAME) .type(OPERATION_TYPE) + .subType(OperationSubType.STEP.getValue()) .build()); SerializableDurableOperation op = diff --git a/sdk/src/test/java/software/amazon/lambda/durable/operation/StepOperationTest.java b/sdk/src/test/java/software/amazon/lambda/durable/operation/StepOperationTest.java index 2eae32f0c..be4962d71 100644 --- a/sdk/src/test/java/software/amazon/lambda/durable/operation/StepOperationTest.java +++ b/sdk/src/test/java/software/amazon/lambda/durable/operation/StepOperationTest.java @@ -12,7 +12,6 @@ import software.amazon.awssdk.services.lambda.model.ErrorObject; import software.amazon.awssdk.services.lambda.model.Operation; import software.amazon.awssdk.services.lambda.model.OperationStatus; -import software.amazon.awssdk.services.lambda.model.OperationType; import software.amazon.awssdk.services.lambda.model.StepDetails; import software.amazon.lambda.durable.DurableConfig; import software.amazon.lambda.durable.TypeToken; @@ -24,6 +23,7 @@ import software.amazon.lambda.durable.execution.ThreadContext; import software.amazon.lambda.durable.execution.ThreadType; import software.amazon.lambda.durable.model.OperationIdentifier; +import software.amazon.lambda.durable.model.OperationSubType; import software.amazon.lambda.durable.serde.JacksonSerDes; class StepOperationTest { @@ -32,7 +32,7 @@ class StepOperationTest { private static final String OPERATION_NAME = "test-step"; private static final String RESULT = "result"; private static final OperationIdentifier OPERATION_IDENTIFIER = - OperationIdentifier.of(OPERATION_ID, OPERATION_NAME, OperationType.STEP); + OperationIdentifier.of(OPERATION_ID, OPERATION_NAME, OperationSubType.STEP); private ExecutionManager executionManager; private DurableContextImpl durableContext; diff --git a/sdk/src/test/java/software/amazon/lambda/durable/operation/WaitForConditionOperationTest.java b/sdk/src/test/java/software/amazon/lambda/durable/operation/WaitForConditionOperationTest.java index 7705b0a17..69502a3c3 100644 --- a/sdk/src/test/java/software/amazon/lambda/durable/operation/WaitForConditionOperationTest.java +++ b/sdk/src/test/java/software/amazon/lambda/durable/operation/WaitForConditionOperationTest.java @@ -27,6 +27,8 @@ import software.amazon.lambda.durable.execution.ExecutionManager; import software.amazon.lambda.durable.execution.ThreadContext; import software.amazon.lambda.durable.execution.ThreadType; +import software.amazon.lambda.durable.model.OperationIdentifier; +import software.amazon.lambda.durable.model.OperationSubType; import software.amazon.lambda.durable.model.WaitForConditionResult; import software.amazon.lambda.durable.serde.JacksonSerDes; @@ -57,7 +59,11 @@ private WaitForConditionOperation createOperation( checkFunc, WaitForConditionConfig config) { return new WaitForConditionOperation<>( - OPERATION_ID, OPERATION_NAME, checkFunc, TypeToken.get(Integer.class), config, durableContext); + OperationIdentifier.of(OPERATION_ID, OPERATION_NAME, OperationSubType.WAIT_FOR_CONDITION), + checkFunc, + TypeToken.get(Integer.class), + config, + durableContext); } // ===== Replay SUCCEEDED ===== diff --git a/sdk/src/test/java/software/amazon/lambda/durable/operation/WaitOperationTest.java b/sdk/src/test/java/software/amazon/lambda/durable/operation/WaitOperationTest.java index d07501dbb..d620595ec 100644 --- a/sdk/src/test/java/software/amazon/lambda/durable/operation/WaitOperationTest.java +++ b/sdk/src/test/java/software/amazon/lambda/durable/operation/WaitOperationTest.java @@ -12,20 +12,20 @@ import org.junit.jupiter.api.Test; import software.amazon.awssdk.services.lambda.model.Operation; import software.amazon.awssdk.services.lambda.model.OperationStatus; -import software.amazon.awssdk.services.lambda.model.OperationType; import software.amazon.awssdk.services.lambda.model.WaitDetails; import software.amazon.lambda.durable.context.DurableContextImpl; import software.amazon.lambda.durable.execution.ExecutionManager; import software.amazon.lambda.durable.execution.ThreadContext; import software.amazon.lambda.durable.execution.ThreadType; import software.amazon.lambda.durable.model.OperationIdentifier; +import software.amazon.lambda.durable.model.OperationSubType; class WaitOperationTest { private static final String OPERATION_ID = "2"; private static final String CONTEXT_ID = "handler"; private static final String OPERATION_NAME = "test-wait"; private static final OperationIdentifier OPERATION_IDENTIFIER = - OperationIdentifier.of(OPERATION_ID, OPERATION_NAME, OperationType.WAIT); + OperationIdentifier.of(OPERATION_ID, OPERATION_NAME, OperationSubType.WAIT); private ExecutionManager executionManager; private DurableContextImpl durableContext;