diff --git a/sdk-testing/src/main/java/software/amazon/lambda/durable/testing/local/LocalMemoryExecutionClient.java b/sdk-testing/src/main/java/software/amazon/lambda/durable/testing/local/LocalMemoryExecutionClient.java index 847872f46..90ee28d99 100644 --- a/sdk-testing/src/main/java/software/amazon/lambda/durable/testing/local/LocalMemoryExecutionClient.java +++ b/sdk-testing/src/main/java/software/amazon/lambda/durable/testing/local/LocalMemoryExecutionClient.java @@ -190,6 +190,7 @@ private void applyResult(Operation op, OperationResult result) { .id(op.id()) .name(op.name()) .type(op.type()) + .subType(op.subType()) .action(action) .parentId(op.parentId()) .payload(result.result()) diff --git a/sdk/src/main/java/software/amazon/lambda/durable/context/DurableContextImpl.java b/sdk/src/main/java/software/amazon/lambda/durable/context/DurableContextImpl.java index 62414aba9..c08b6317b 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/context/DurableContextImpl.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/context/DurableContextImpl.java @@ -11,7 +11,6 @@ import java.util.function.BiFunction; import java.util.function.Function; import org.slf4j.LoggerFactory; -import software.amazon.awssdk.services.lambda.model.OperationType; import software.amazon.lambda.durable.DurableCallbackFuture; import software.amazon.lambda.durable.DurableConfig; import software.amazon.lambda.durable.DurableContext; @@ -146,7 +145,7 @@ public DurableFuture stepAsync( // Create and start step operation with TypeToken var operation = new StepOperation<>( - OperationIdentifier.of(operationId, name, OperationType.STEP), func, resultType, config, this); + OperationIdentifier.of(operationId, name, OperationSubType.STEP), func, resultType, config, this); operation.execute(); // Start the step (returns immediately) @@ -162,7 +161,7 @@ public DurableFuture waitAsync(String name, Duration duration) { // Create and start wait operation var operation = - new WaitOperation(OperationIdentifier.of(operationId, name, OperationType.WAIT), duration, this); + new WaitOperation(OperationIdentifier.of(operationId, name, OperationSubType.WAIT), duration, this); operation.execute(); // Checkpoint the wait return operation; @@ -187,7 +186,7 @@ public DurableFuture invokeAsync( // Create and start invoke operation var operation = new InvokeOperation<>( - OperationIdentifier.of(operationId, name, OperationType.CHAINED_INVOKE), + OperationIdentifier.of(operationId, name, OperationSubType.CHAINED_INVOKE), functionName, payload, resultType, @@ -207,7 +206,7 @@ public DurableCallbackFuture createCallback(String name, TypeToken res var operationId = nextOperationId(); var operation = new CallbackOperation<>( - OperationIdentifier.of(operationId, name, OperationType.CALLBACK), resultType, config, this); + OperationIdentifier.of(operationId, name, OperationSubType.CALLBACK), resultType, config, this); operation.execute(); return operation; @@ -248,11 +247,7 @@ private DurableFuture runInChildContextAsync( var operationId = nextOperationId(); var operation = new ChildContextOperation<>( - OperationIdentifier.of(operationId, name, OperationType.CONTEXT, subType), - func, - resultType, - config, - this); + OperationIdentifier.of(operationId, name, subType), func, resultType, config, this); operation.execute(); return operation; @@ -277,7 +272,7 @@ public DurableFuture> mapAsync( var operationId = nextOperationId(); var operation = new MapOperation<>( - OperationIdentifier.of(operationId, name, OperationType.CONTEXT, OperationSubType.MAP), + OperationIdentifier.of(operationId, name, OperationSubType.MAP), itemList, function, resultType, @@ -293,7 +288,7 @@ public ParallelDurableFuture parallel(String name, ParallelConfig config) { var operationId = nextOperationId(); var parallelOp = new ParallelOperation( - OperationIdentifier.of(operationId, name, OperationType.CONTEXT, OperationSubType.PARALLEL), + OperationIdentifier.of(operationId, name, OperationSubType.PARALLEL), getDurableConfig().getSerDes(), this, config); @@ -363,7 +358,12 @@ public DurableFuture waitForConditionAsync( } var operationId = nextOperationId(); - var operation = new WaitForConditionOperation<>(operationId, name, checkFunc, resultType, config, this); + var operation = new WaitForConditionOperation<>( + OperationIdentifier.of(operationId, name, OperationSubType.WAIT_FOR_CONDITION), + checkFunc, + resultType, + config, + this); operation.execute(); diff --git a/sdk/src/main/java/software/amazon/lambda/durable/model/OperationIdentifier.java b/sdk/src/main/java/software/amazon/lambda/durable/model/OperationIdentifier.java index 17fd4771a..9f986f4ce 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/model/OperationIdentifier.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/model/OperationIdentifier.java @@ -5,24 +5,21 @@ import software.amazon.awssdk.services.lambda.model.OperationType; /** - * Identifies a durable operation by its unique ID, human-readable name, type, and optional sub-type. + * Identifies a durable operation by its unique ID, human-readable name, type and sub-type. * * @param operationId unique sequential identifier for the operation within an execution * @param name human-readable name for the operation - * @param operationType the kind of operation (STEP, WAIT, CALLBACK, etc.) - * @param subType optional sub-type for operations that need further classification (e.g. child contexts) + * @param subType the operation sub-type which also determines the operation type */ -public record OperationIdentifier( - String operationId, String name, OperationType operationType, OperationSubType subType) { +public record OperationIdentifier(String operationId, String name, OperationSubType subType) { - /** Creates an identifier without a sub-type. */ - public static OperationIdentifier of(String operationId, String name, OperationType type) { - return new OperationIdentifier(operationId, name, type, null); + /** Returns the operation type derived from the sub-type. */ + public OperationType operationType() { + return subType.getOperationType(); } - /** Creates an identifier with a sub-type. */ - public static OperationIdentifier of( - String operationId, String name, OperationType type, OperationSubType subType) { - return new OperationIdentifier(operationId, name, type, subType); + /** Creates an identifier with the given sub-type. */ + public static OperationIdentifier of(String operationId, String name, OperationSubType subType) { + return new OperationIdentifier(operationId, name, subType); } } diff --git a/sdk/src/main/java/software/amazon/lambda/durable/model/OperationSubType.java b/sdk/src/main/java/software/amazon/lambda/durable/model/OperationSubType.java index 416798f9f..c7273b9cd 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/model/OperationSubType.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/model/OperationSubType.java @@ -2,28 +2,41 @@ // SPDX-License-Identifier: Apache-2.0 package software.amazon.lambda.durable.model; +import software.amazon.awssdk.services.lambda.model.OperationType; + /** - * Fine-grained classification of durable operations beyond the basic operation types. + * Fine-grained classification of durable operations that pairs each subtype with its parent {@link OperationType}. * - *

Used as the {@code subType} field in checkpoint updates for {@code CONTEXT} operations. Matches the + *

Used as the source of both the {@code type} and {@code subType} fields in checkpoint updates. Matches the * {@code OperationSubType} enum in the JavaScript and Python durable execution SDKs. */ public enum OperationSubType { - RUN_IN_CHILD_CONTEXT("RunInChildContext"), - MAP("Map"), - MAP_ITERATION("MapIteration"), - PARALLEL("Parallel"), - PARALLEL_BRANCH("ParallelBranch"), - WAIT_FOR_CALLBACK("WaitForCallback"), - WAIT_FOR_CONDITION("WaitForCondition"), - WITH_RETRY("WithRetry"); + STEP(OperationType.STEP, "Step"), + WAIT(OperationType.WAIT, "Wait"), + CALLBACK(OperationType.CALLBACK, "Callback"), + CHAINED_INVOKE(OperationType.CHAINED_INVOKE, "ChainedInvoke"), + RUN_IN_CHILD_CONTEXT(OperationType.CONTEXT, "RunInChildContext"), + MAP(OperationType.CONTEXT, "Map"), + MAP_ITERATION(OperationType.CONTEXT, "MapIteration"), + PARALLEL(OperationType.CONTEXT, "Parallel"), + PARALLEL_BRANCH(OperationType.CONTEXT, "ParallelBranch"), + WAIT_FOR_CALLBACK(OperationType.CONTEXT, "WaitForCallback"), + WAIT_FOR_CONDITION(OperationType.STEP, "WaitForCondition"), + WITH_RETRY(OperationType.CONTEXT, "WithRetry"); + private final OperationType operationType; private final String value; - OperationSubType(String value) { + OperationSubType(OperationType operationType, String value) { + this.operationType = operationType; this.value = value; } + /** Returns the parent {@link OperationType} for this subtype. */ + public OperationType getOperationType() { + return operationType; + } + /** Returns the wire-format string value sent in checkpoint updates. */ public String getValue() { return value; diff --git a/sdk/src/main/java/software/amazon/lambda/durable/operation/BaseDurableOperation.java b/sdk/src/main/java/software/amazon/lambda/durable/operation/BaseDurableOperation.java index 7a88173b0..b3dd55b95 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/operation/BaseDurableOperation.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/operation/BaseDurableOperation.java @@ -410,11 +410,11 @@ protected void sendOperationUpdate(OperationUpdate.Builder builder) { /** Sends an operation update asynchronously. */ protected CompletableFuture sendOperationUpdateAsync(OperationUpdate.Builder builder) { - var updateBuilder = - builder.id(getOperationId()).name(getName()).type(getType()).parentId(durableContext.getParentId()); - if (getSubType() != null) { - updateBuilder.subType(getSubType().getValue()); - } + var updateBuilder = builder.id(getOperationId()) + .name(getName()) + .type(getType()) + .subType(getSubType().getValue()) + .parentId(durableContext.getParentId()); var update = updateBuilder.build(); if (replayCompletedOperation.get()) { // We are replaying a completed operation, so complete the completableFuture without checkpointing @@ -444,9 +444,7 @@ protected void validateReplay(Operation checkpointed) { getOperationId(), checkpointed.name(), getName()))); } - if ((getSubType() == null && checkpointed.subType() != null) - || getSubType() != null - && !Objects.equals(checkpointed.subType(), getSubType().getValue())) { + if (!Objects.equals(checkpointed.subType(), getSubType().getValue())) { throw terminateExecution(new NonDeterministicExecutionException(String.format( "Operation subType mismatch for \"%s\". Expected \"%s\", got \"%s\"", getOperationId(), checkpointed.subType(), getSubType()))); diff --git a/sdk/src/main/java/software/amazon/lambda/durable/operation/ChildContextOperation.java b/sdk/src/main/java/software/amazon/lambda/durable/operation/ChildContextOperation.java index 1aba0f72b..7b47228be 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/operation/ChildContextOperation.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/operation/ChildContextOperation.java @@ -248,7 +248,8 @@ private Throwable translateException(Operation op, ErrorObject errorObject) { case RUN_IN_CHILD_CONTEXT, WITH_RETRY -> new ChildContextFailedException(op); // the following subtypes should not be able to reach here - case PARALLEL, MAP, WAIT_FOR_CONDITION -> new IllegalStateException("Unexpected sub-type: " + getSubType()); + case PARALLEL, MAP, WAIT_FOR_CONDITION, STEP, WAIT, CALLBACK, CHAINED_INVOKE -> + new IllegalStateException("Unexpected sub-type: " + getSubType()); }; } diff --git a/sdk/src/main/java/software/amazon/lambda/durable/operation/ConcurrencyOperation.java b/sdk/src/main/java/software/amazon/lambda/durable/operation/ConcurrencyOperation.java index 4c1e4c2d1..6fee06960 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/operation/ConcurrencyOperation.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/operation/ConcurrencyOperation.java @@ -16,7 +16,6 @@ import java.util.function.Function; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import software.amazon.awssdk.services.lambda.model.OperationType; import software.amazon.lambda.durable.DurableContext; import software.amazon.lambda.durable.TypeToken; import software.amazon.lambda.durable.config.NestingType; @@ -116,7 +115,7 @@ protected ChildContextOperation createItem( SerDes serDes, OperationSubType branchSubType) { return new ChildContextOperation<>( - OperationIdentifier.of(operationId, name, OperationType.CONTEXT, branchSubType), + OperationIdentifier.of(operationId, name, branchSubType), function, resultType, RunInChildContextConfig.builder() diff --git a/sdk/src/main/java/software/amazon/lambda/durable/operation/WaitForConditionOperation.java b/sdk/src/main/java/software/amazon/lambda/durable/operation/WaitForConditionOperation.java index fa8bb7ffd..5ca9d7d66 100644 --- a/sdk/src/main/java/software/amazon/lambda/durable/operation/WaitForConditionOperation.java +++ b/sdk/src/main/java/software/amazon/lambda/durable/operation/WaitForConditionOperation.java @@ -8,7 +8,6 @@ import software.amazon.awssdk.services.lambda.model.Operation; import software.amazon.awssdk.services.lambda.model.OperationAction; import software.amazon.awssdk.services.lambda.model.OperationStatus; -import software.amazon.awssdk.services.lambda.model.OperationType; import software.amazon.awssdk.services.lambda.model.OperationUpdate; import software.amazon.awssdk.services.lambda.model.StepOptions; import software.amazon.lambda.durable.StepContext; @@ -21,7 +20,6 @@ import software.amazon.lambda.durable.execution.SuspendExecutionException; import software.amazon.lambda.durable.execution.ThreadType; import software.amazon.lambda.durable.model.OperationIdentifier; -import software.amazon.lambda.durable.model.OperationSubType; import software.amazon.lambda.durable.model.WaitForConditionResult; import software.amazon.lambda.durable.util.ExceptionHelper; @@ -41,17 +39,12 @@ public class WaitForConditionOperation extends SerializableDurableOperation config; public WaitForConditionOperation( - String operationId, - String name, + OperationIdentifier operationIdentifier, BiFunction> checkFunc, TypeToken resultTypeToken, WaitForConditionConfig config, DurableContextImpl durableContext) { - super( - OperationIdentifier.of(operationId, name, OperationType.STEP, OperationSubType.WAIT_FOR_CONDITION), - resultTypeToken, - config.serDes(), - durableContext); + super(operationIdentifier, resultTypeToken, config.serDes(), durableContext); this.checkFunc = checkFunc; this.config = config; diff --git a/sdk/src/test/java/software/amazon/lambda/durable/ReplayValidationTest.java b/sdk/src/test/java/software/amazon/lambda/durable/ReplayValidationTest.java index e8e75b1df..cdbe0711b 100644 --- a/sdk/src/test/java/software/amazon/lambda/durable/ReplayValidationTest.java +++ b/sdk/src/test/java/software/amazon/lambda/durable/ReplayValidationTest.java @@ -21,6 +21,7 @@ import software.amazon.lambda.durable.execution.ThreadContext; import software.amazon.lambda.durable.execution.ThreadType; import software.amazon.lambda.durable.model.DurableExecutionInput; +import software.amazon.lambda.durable.model.OperationSubType; class ReplayValidationTest { private static final String EXECUTION_NAME = "exec-name"; @@ -67,6 +68,7 @@ void shouldPassValidationWhenStepTypeAndNameMatch() { .id(OPERATION_ID1) .name("test") .type(OperationType.STEP) + .subType(OperationSubType.STEP.getValue()) .status(OperationStatus.SUCCEEDED) .stepDetails(StepDetails.builder().result("\"result\"").build()) .build(); @@ -83,6 +85,7 @@ void shouldPassValidationWhenWaitTypeMatches() { var existingOp = Operation.builder() .id(OPERATION_ID1) .type(OperationType.WAIT) + .subType(OperationSubType.WAIT.getValue()) .status(OperationStatus.SUCCEEDED) .build(); @@ -144,6 +147,7 @@ void shouldHandleNullNamesCorrectly() { .id(OPERATION_ID1) .name(null) .type(OperationType.STEP) + .subType(OperationSubType.STEP.getValue()) .status(OperationStatus.SUCCEEDED) .stepDetails(StepDetails.builder().result("\"result\"").build()) .build(); diff --git a/sdk/src/test/java/software/amazon/lambda/durable/execution/DurableExecutionTest.java b/sdk/src/test/java/software/amazon/lambda/durable/execution/DurableExecutionTest.java index 6c0668f8b..5b7ccbb6f 100644 --- a/sdk/src/test/java/software/amazon/lambda/durable/execution/DurableExecutionTest.java +++ b/sdk/src/test/java/software/amazon/lambda/durable/execution/DurableExecutionTest.java @@ -25,6 +25,7 @@ import software.amazon.lambda.durable.exception.UnrecoverableDurableExecutionException; import software.amazon.lambda.durable.model.DurableExecutionInput; import software.amazon.lambda.durable.model.ExecutionStatus; +import software.amazon.lambda.durable.model.OperationSubType; class DurableExecutionTest { @@ -187,6 +188,7 @@ void testExecuteReplay() { .id(OPERATION_ID1) .name("step1") .type(OperationType.STEP) + .subType(OperationSubType.STEP.getValue()) .status(OperationStatus.SUCCEEDED) .stepDetails(StepDetails.builder().result("\"First\"").build()) .build(); diff --git a/sdk/src/test/java/software/amazon/lambda/durable/operation/CallbackOperationTest.java b/sdk/src/test/java/software/amazon/lambda/durable/operation/CallbackOperationTest.java index ebe2d7e99..f7e5de707 100644 --- a/sdk/src/test/java/software/amazon/lambda/durable/operation/CallbackOperationTest.java +++ b/sdk/src/test/java/software/amazon/lambda/durable/operation/CallbackOperationTest.java @@ -26,6 +26,7 @@ import software.amazon.lambda.durable.execution.ThreadType; import software.amazon.lambda.durable.model.DurableExecutionInput; import software.amazon.lambda.durable.model.OperationIdentifier; +import software.amazon.lambda.durable.model.OperationSubType; import software.amazon.lambda.durable.serde.JacksonSerDes; import software.amazon.lambda.durable.serde.SerDes; @@ -34,7 +35,7 @@ class CallbackOperationTest { private static final String OPERATION_ID = TestUtils.hashOperationId("1"); private static final String OPERATION_NAME = "approval"; private static final OperationIdentifier OPERATION_IDENTIFIER = - OperationIdentifier.of(OPERATION_ID, OPERATION_NAME, OperationType.CALLBACK); + OperationIdentifier.of(OPERATION_ID, OPERATION_NAME, OperationSubType.CALLBACK); private static final String EXECUTION_NAME = "exec-name"; private static final String EXECUTION_OP_ID = "123"; private static final String EXECUTION_ARN = "arn:aws:lambda:us-east-1:123456789012:function:test/durable-execution/" @@ -139,6 +140,7 @@ void replayReturnsExistingCallbackIdWhenSucceeded() { .id(OPERATION_ID) .name(OPERATION_NAME) .type(OperationType.CALLBACK) + .subType(OperationSubType.CALLBACK.getValue()) .status(OperationStatus.SUCCEEDED) .callbackDetails(CallbackDetails.builder() .callbackId("existing-callback-id") @@ -165,6 +167,7 @@ void getReturnsDeserializedResultWhenSucceeded() { .id(OPERATION_ID) .name(OPERATION_NAME) .type(OperationType.CALLBACK) + .subType(OperationSubType.CALLBACK.getValue()) .status(OperationStatus.SUCCEEDED) .callbackDetails(CallbackDetails.builder() .callbackId("callback-id") @@ -193,6 +196,7 @@ void getThrowsCallbackExceptionWhenFailed() { .id(OPERATION_ID) .name(OPERATION_NAME) .type(OperationType.CALLBACK) + .subType(OperationSubType.CALLBACK.getValue()) .status(OperationStatus.FAILED) .callbackDetails(CallbackDetails.builder() .callbackId("callback-id") @@ -224,6 +228,7 @@ void getThrowsCallbackTimeoutExceptionWhenTimedOut() { .id(OPERATION_ID) .name(OPERATION_NAME) .type(OperationType.CALLBACK) + .subType(OperationSubType.CALLBACK.getValue()) .status(OperationStatus.TIMED_OUT) .callbackDetails( CallbackDetails.builder().callbackId("callback-id").build()) @@ -252,6 +257,7 @@ void operationUsesCustomSerDesWhenConfigContainsOne() { .id(OPERATION_ID) .name(OPERATION_NAME) .type(OperationType.CALLBACK) + .subType(OperationSubType.CALLBACK.getValue()) .status(OperationStatus.SUCCEEDED) .callbackDetails(CallbackDetails.builder() .callbackId("callback-id") @@ -280,6 +286,7 @@ void operationUsesDefaultSerDesWhenConfigIsNull() { .id(OPERATION_ID) .name(OPERATION_NAME) .type(OperationType.CALLBACK) + .subType(OperationSubType.CALLBACK.getValue()) .status(OperationStatus.SUCCEEDED) .callbackDetails(CallbackDetails.builder() .callbackId("callback-id") @@ -310,6 +317,7 @@ void operationUsesDefaultSerDesWhenConfigSerDesIsNull() { .id(OPERATION_ID) .name(OPERATION_NAME) .type(OperationType.CALLBACK) + .subType(OperationSubType.CALLBACK.getValue()) .status(OperationStatus.SUCCEEDED) .callbackDetails(CallbackDetails.builder() .callbackId("callback-id") @@ -338,6 +346,7 @@ void getThrowsSerDesExceptionWithHelpfulMessageWhenDeserializationFails() { .id(OPERATION_ID) .name(OPERATION_NAME) .type(OperationType.CALLBACK) + .subType(OperationSubType.CALLBACK.getValue()) .status(OperationStatus.SUCCEEDED) .callbackDetails(CallbackDetails.builder() .callbackId("test-callback-123") diff --git a/sdk/src/test/java/software/amazon/lambda/durable/operation/ChildContextOperationTest.java b/sdk/src/test/java/software/amazon/lambda/durable/operation/ChildContextOperationTest.java index 9a71e46a3..ac56262a4 100644 --- a/sdk/src/test/java/software/amazon/lambda/durable/operation/ChildContextOperationTest.java +++ b/sdk/src/test/java/software/amazon/lambda/durable/operation/ChildContextOperationTest.java @@ -55,7 +55,7 @@ private DurableConfig createConfig() { } private static final OperationIdentifier OPERATION_IDENTIFIER = - OperationIdentifier.of("1", "test-context", OperationType.CONTEXT, OperationSubType.RUN_IN_CHILD_CONTEXT); + OperationIdentifier.of("1", "test-context", OperationSubType.RUN_IN_CHILD_CONTEXT); private ChildContextOperation createOperation(Function func) { return new ChildContextOperation<>( diff --git a/sdk/src/test/java/software/amazon/lambda/durable/operation/ConcurrencyOperationTest.java b/sdk/src/test/java/software/amazon/lambda/durable/operation/ConcurrencyOperationTest.java index d29c3e842..0bc2ea51a 100644 --- a/sdk/src/test/java/software/amazon/lambda/durable/operation/ConcurrencyOperationTest.java +++ b/sdk/src/test/java/software/amazon/lambda/durable/operation/ConcurrencyOperationTest.java @@ -85,8 +85,7 @@ void setUp() { private TestConcurrencyOperation createOperation(CompletionConfig completionConfig) throws Exception { return new TestConcurrencyOperation( - OperationIdentifier.of( - OPERATION_ID, "test-concurrency", OperationType.CONTEXT, OperationSubType.PARALLEL), + OperationIdentifier.of(OPERATION_ID, "test-concurrency", OperationSubType.PARALLEL), RESULT_TYPE, SER_DES, durableContext, diff --git a/sdk/src/test/java/software/amazon/lambda/durable/operation/InvokeOperationTest.java b/sdk/src/test/java/software/amazon/lambda/durable/operation/InvokeOperationTest.java index daa75056c..2c1d76c74 100644 --- a/sdk/src/test/java/software/amazon/lambda/durable/operation/InvokeOperationTest.java +++ b/sdk/src/test/java/software/amazon/lambda/durable/operation/InvokeOperationTest.java @@ -13,7 +13,6 @@ import software.amazon.awssdk.services.lambda.model.ErrorObject; import software.amazon.awssdk.services.lambda.model.Operation; import software.amazon.awssdk.services.lambda.model.OperationStatus; -import software.amazon.awssdk.services.lambda.model.OperationType; import software.amazon.lambda.durable.TypeToken; import software.amazon.lambda.durable.config.InvokeConfig; import software.amazon.lambda.durable.context.DurableContextImpl; @@ -25,13 +24,14 @@ import software.amazon.lambda.durable.execution.ThreadContext; import software.amazon.lambda.durable.execution.ThreadType; import software.amazon.lambda.durable.model.OperationIdentifier; +import software.amazon.lambda.durable.model.OperationSubType; import software.amazon.lambda.durable.serde.JacksonSerDes; class InvokeOperationTest { private static final String OPERATION_ID = "2"; private static final String OPERATION_NAME = "test-invoke"; private static final OperationIdentifier OPERATION_IDENTIFIER = - OperationIdentifier.of(OPERATION_ID, OPERATION_NAME, OperationType.CHAINED_INVOKE); + OperationIdentifier.of(OPERATION_ID, OPERATION_NAME, OperationSubType.CHAINED_INVOKE); private ExecutionManager executionManager; private DurableContextImpl durableContext; diff --git a/sdk/src/test/java/software/amazon/lambda/durable/operation/ParallelOperationTest.java b/sdk/src/test/java/software/amazon/lambda/durable/operation/ParallelOperationTest.java index 32a41f2cd..e64af1870 100644 --- a/sdk/src/test/java/software/amazon/lambda/durable/operation/ParallelOperationTest.java +++ b/sdk/src/test/java/software/amazon/lambda/durable/operation/ParallelOperationTest.java @@ -115,7 +115,7 @@ void setUp() { private ParallelOperation createOperation(CompletionConfig completionConfig) { var op = new ParallelOperation( - OperationIdentifier.of(OPERATION_ID, "test-parallel", OperationType.CONTEXT, OperationSubType.PARALLEL), + OperationIdentifier.of(OPERATION_ID, "test-parallel", OperationSubType.PARALLEL), SER_DES, durableContext, ParallelConfig.builder().completionConfig(completionConfig).build()); diff --git a/sdk/src/test/java/software/amazon/lambda/durable/operation/SerializableDurableOperationTest.java b/sdk/src/test/java/software/amazon/lambda/durable/operation/SerializableDurableOperationTest.java index 6ae8cc890..195636fd8 100644 --- a/sdk/src/test/java/software/amazon/lambda/durable/operation/SerializableDurableOperationTest.java +++ b/sdk/src/test/java/software/amazon/lambda/durable/operation/SerializableDurableOperationTest.java @@ -36,6 +36,7 @@ import software.amazon.lambda.durable.execution.ThreadContext; import software.amazon.lambda.durable.execution.ThreadType; import software.amazon.lambda.durable.model.OperationIdentifier; +import software.amazon.lambda.durable.model.OperationSubType; import software.amazon.lambda.durable.serde.JacksonSerDes; import software.amazon.lambda.durable.serde.SerDes; @@ -47,7 +48,7 @@ class SerializableDurableOperationTest { private static final Operation OPERATION = Operation.builder().build(); private static final OperationType OPERATION_TYPE = OperationType.STEP; private static final OperationIdentifier OPERATION_IDENTIFIER = - OperationIdentifier.of(OPERATION_ID, OPERATION_NAME, OPERATION_TYPE); + OperationIdentifier.of(OPERATION_ID, OPERATION_NAME, OperationSubType.STEP); private static final TypeToken RESULT_TYPE = TypeToken.get(String.class); private static final SerDes SER_DES = new JacksonSerDes(); private static final String RESULT = "name"; @@ -272,6 +273,7 @@ void validateReplayDoesNotThrowWhenNameAndTypeMatch() { .thenReturn(Operation.builder() .name(OPERATION_NAME) .type(OPERATION_TYPE) + .subType(OperationSubType.STEP.getValue()) .build()); SerializableDurableOperation op = diff --git a/sdk/src/test/java/software/amazon/lambda/durable/operation/StepOperationTest.java b/sdk/src/test/java/software/amazon/lambda/durable/operation/StepOperationTest.java index 2eae32f0c..be4962d71 100644 --- a/sdk/src/test/java/software/amazon/lambda/durable/operation/StepOperationTest.java +++ b/sdk/src/test/java/software/amazon/lambda/durable/operation/StepOperationTest.java @@ -12,7 +12,6 @@ import software.amazon.awssdk.services.lambda.model.ErrorObject; import software.amazon.awssdk.services.lambda.model.Operation; import software.amazon.awssdk.services.lambda.model.OperationStatus; -import software.amazon.awssdk.services.lambda.model.OperationType; import software.amazon.awssdk.services.lambda.model.StepDetails; import software.amazon.lambda.durable.DurableConfig; import software.amazon.lambda.durable.TypeToken; @@ -24,6 +23,7 @@ import software.amazon.lambda.durable.execution.ThreadContext; import software.amazon.lambda.durable.execution.ThreadType; import software.amazon.lambda.durable.model.OperationIdentifier; +import software.amazon.lambda.durable.model.OperationSubType; import software.amazon.lambda.durable.serde.JacksonSerDes; class StepOperationTest { @@ -32,7 +32,7 @@ class StepOperationTest { private static final String OPERATION_NAME = "test-step"; private static final String RESULT = "result"; private static final OperationIdentifier OPERATION_IDENTIFIER = - OperationIdentifier.of(OPERATION_ID, OPERATION_NAME, OperationType.STEP); + OperationIdentifier.of(OPERATION_ID, OPERATION_NAME, OperationSubType.STEP); private ExecutionManager executionManager; private DurableContextImpl durableContext; diff --git a/sdk/src/test/java/software/amazon/lambda/durable/operation/WaitForConditionOperationTest.java b/sdk/src/test/java/software/amazon/lambda/durable/operation/WaitForConditionOperationTest.java index 7705b0a17..69502a3c3 100644 --- a/sdk/src/test/java/software/amazon/lambda/durable/operation/WaitForConditionOperationTest.java +++ b/sdk/src/test/java/software/amazon/lambda/durable/operation/WaitForConditionOperationTest.java @@ -27,6 +27,8 @@ import software.amazon.lambda.durable.execution.ExecutionManager; import software.amazon.lambda.durable.execution.ThreadContext; import software.amazon.lambda.durable.execution.ThreadType; +import software.amazon.lambda.durable.model.OperationIdentifier; +import software.amazon.lambda.durable.model.OperationSubType; import software.amazon.lambda.durable.model.WaitForConditionResult; import software.amazon.lambda.durable.serde.JacksonSerDes; @@ -57,7 +59,11 @@ private WaitForConditionOperation createOperation( checkFunc, WaitForConditionConfig config) { return new WaitForConditionOperation<>( - OPERATION_ID, OPERATION_NAME, checkFunc, TypeToken.get(Integer.class), config, durableContext); + OperationIdentifier.of(OPERATION_ID, OPERATION_NAME, OperationSubType.WAIT_FOR_CONDITION), + checkFunc, + TypeToken.get(Integer.class), + config, + durableContext); } // ===== Replay SUCCEEDED ===== diff --git a/sdk/src/test/java/software/amazon/lambda/durable/operation/WaitOperationTest.java b/sdk/src/test/java/software/amazon/lambda/durable/operation/WaitOperationTest.java index d07501dbb..d620595ec 100644 --- a/sdk/src/test/java/software/amazon/lambda/durable/operation/WaitOperationTest.java +++ b/sdk/src/test/java/software/amazon/lambda/durable/operation/WaitOperationTest.java @@ -12,20 +12,20 @@ import org.junit.jupiter.api.Test; import software.amazon.awssdk.services.lambda.model.Operation; import software.amazon.awssdk.services.lambda.model.OperationStatus; -import software.amazon.awssdk.services.lambda.model.OperationType; import software.amazon.awssdk.services.lambda.model.WaitDetails; import software.amazon.lambda.durable.context.DurableContextImpl; import software.amazon.lambda.durable.execution.ExecutionManager; import software.amazon.lambda.durable.execution.ThreadContext; import software.amazon.lambda.durable.execution.ThreadType; import software.amazon.lambda.durable.model.OperationIdentifier; +import software.amazon.lambda.durable.model.OperationSubType; class WaitOperationTest { private static final String OPERATION_ID = "2"; private static final String CONTEXT_ID = "handler"; private static final String OPERATION_NAME = "test-wait"; private static final OperationIdentifier OPERATION_IDENTIFIER = - OperationIdentifier.of(OPERATION_ID, OPERATION_NAME, OperationType.WAIT); + OperationIdentifier.of(OPERATION_ID, OPERATION_NAME, OperationSubType.WAIT); private ExecutionManager executionManager; private DurableContextImpl durableContext;