Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ private void applyResult(Operation op, OperationResult result) {
.id(op.id())
.name(op.name())
.type(op.type())
.subType(op.subType())
.action(action)
.parentId(op.parentId())
.payload(result.result())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import java.util.function.BiFunction;
import java.util.function.Function;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.lambda.model.OperationType;
import software.amazon.lambda.durable.DurableCallbackFuture;
import software.amazon.lambda.durable.DurableConfig;
import software.amazon.lambda.durable.DurableContext;
Expand Down Expand Up @@ -146,7 +145,7 @@ public <T> DurableFuture<T> stepAsync(

// Create and start step operation with TypeToken
var operation = new StepOperation<>(
OperationIdentifier.of(operationId, name, OperationType.STEP), func, resultType, config, this);
OperationIdentifier.of(operationId, name, OperationSubType.STEP), func, resultType, config, this);

operation.execute(); // Start the step (returns immediately)

Expand All @@ -162,7 +161,7 @@ public DurableFuture<Void> waitAsync(String name, Duration duration) {

// Create and start wait operation
var operation =
new WaitOperation(OperationIdentifier.of(operationId, name, OperationType.WAIT), duration, this);
new WaitOperation(OperationIdentifier.of(operationId, name, OperationSubType.WAIT), duration, this);

operation.execute(); // Checkpoint the wait
return operation;
Expand All @@ -187,7 +186,7 @@ public <T, U> DurableFuture<T> invokeAsync(

// Create and start invoke operation
var operation = new InvokeOperation<>(
OperationIdentifier.of(operationId, name, OperationType.CHAINED_INVOKE),
OperationIdentifier.of(operationId, name, OperationSubType.CHAINED_INVOKE),
functionName,
payload,
resultType,
Expand All @@ -207,7 +206,7 @@ public <T> DurableCallbackFuture<T> createCallback(String name, TypeToken<T> res
var operationId = nextOperationId();

var operation = new CallbackOperation<>(
OperationIdentifier.of(operationId, name, OperationType.CALLBACK), resultType, config, this);
OperationIdentifier.of(operationId, name, OperationSubType.CALLBACK), resultType, config, this);
operation.execute();

return operation;
Expand Down Expand Up @@ -248,11 +247,7 @@ private <T> DurableFuture<T> runInChildContextAsync(
var operationId = nextOperationId();

var operation = new ChildContextOperation<>(
OperationIdentifier.of(operationId, name, OperationType.CONTEXT, subType),
func,
resultType,
config,
this);
OperationIdentifier.of(operationId, name, subType), func, resultType, config, this);

operation.execute();
return operation;
Expand All @@ -277,7 +272,7 @@ public <I, O> DurableFuture<MapResult<O>> mapAsync(
var operationId = nextOperationId();

var operation = new MapOperation<>(
OperationIdentifier.of(operationId, name, OperationType.CONTEXT, OperationSubType.MAP),
OperationIdentifier.of(operationId, name, OperationSubType.MAP),
itemList,
function,
resultType,
Expand All @@ -293,7 +288,7 @@ public ParallelDurableFuture parallel(String name, ParallelConfig config) {
var operationId = nextOperationId();

var parallelOp = new ParallelOperation(
OperationIdentifier.of(operationId, name, OperationType.CONTEXT, OperationSubType.PARALLEL),
OperationIdentifier.of(operationId, name, OperationSubType.PARALLEL),
getDurableConfig().getSerDes(),
this,
config);
Expand Down Expand Up @@ -363,7 +358,12 @@ public <T> DurableFuture<T> waitForConditionAsync(
}
var operationId = nextOperationId();

var operation = new WaitForConditionOperation<>(operationId, name, checkFunc, resultType, config, this);
var operation = new WaitForConditionOperation<>(
OperationIdentifier.of(operationId, name, OperationSubType.WAIT_FOR_CONDITION),
checkFunc,
resultType,
config,
this);

operation.execute();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,21 @@
import software.amazon.awssdk.services.lambda.model.OperationType;

/**
* Identifies a durable operation by its unique ID, human-readable name, type, and optional sub-type.
* Identifies a durable operation by its unique ID, human-readable name, type and sub-type.
*
* @param operationId unique sequential identifier for the operation within an execution
* @param name human-readable name for the operation
* @param operationType the kind of operation (STEP, WAIT, CALLBACK, etc.)
* @param subType optional sub-type for operations that need further classification (e.g. child contexts)
* @param subType the operation sub-type which also determines the operation type
*/
public record OperationIdentifier(
String operationId, String name, OperationType operationType, OperationSubType subType) {
public record OperationIdentifier(String operationId, String name, OperationSubType subType) {

/** Creates an identifier without a sub-type. */
public static OperationIdentifier of(String operationId, String name, OperationType type) {
return new OperationIdentifier(operationId, name, type, null);
/** Returns the operation type derived from the sub-type. */
public OperationType operationType() {
return subType.getOperationType();
}

/** Creates an identifier with a sub-type. */
public static OperationIdentifier of(
String operationId, String name, OperationType type, OperationSubType subType) {
return new OperationIdentifier(operationId, name, type, subType);
/** Creates an identifier with the given sub-type. */
public static OperationIdentifier of(String operationId, String name, OperationSubType subType) {
return new OperationIdentifier(operationId, name, subType);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,41 @@
// SPDX-License-Identifier: Apache-2.0
package software.amazon.lambda.durable.model;

import software.amazon.awssdk.services.lambda.model.OperationType;

/**
* Fine-grained classification of durable operations beyond the basic operation types.
* Fine-grained classification of durable operations that pairs each subtype with its parent {@link OperationType}.
*
* <p>Used as the {@code subType} field in checkpoint updates for {@code CONTEXT} operations. Matches the
* <p>Used as the source of both the {@code type} and {@code subType} fields in checkpoint updates. Matches the
* {@code OperationSubType} enum in the JavaScript and Python durable execution SDKs.
*/
public enum OperationSubType {
RUN_IN_CHILD_CONTEXT("RunInChildContext"),
MAP("Map"),
MAP_ITERATION("MapIteration"),
PARALLEL("Parallel"),
PARALLEL_BRANCH("ParallelBranch"),
WAIT_FOR_CALLBACK("WaitForCallback"),
WAIT_FOR_CONDITION("WaitForCondition"),
WITH_RETRY("WithRetry");
STEP(OperationType.STEP, "Step"),
WAIT(OperationType.WAIT, "Wait"),
CALLBACK(OperationType.CALLBACK, "Callback"),
CHAINED_INVOKE(OperationType.CHAINED_INVOKE, "ChainedInvoke"),
RUN_IN_CHILD_CONTEXT(OperationType.CONTEXT, "RunInChildContext"),
MAP(OperationType.CONTEXT, "Map"),
MAP_ITERATION(OperationType.CONTEXT, "MapIteration"),
PARALLEL(OperationType.CONTEXT, "Parallel"),
PARALLEL_BRANCH(OperationType.CONTEXT, "ParallelBranch"),
WAIT_FOR_CALLBACK(OperationType.CONTEXT, "WaitForCallback"),
WAIT_FOR_CONDITION(OperationType.STEP, "WaitForCondition"),
WITH_RETRY(OperationType.CONTEXT, "WithRetry");

private final OperationType operationType;
private final String value;

OperationSubType(String value) {
OperationSubType(OperationType operationType, String value) {
this.operationType = operationType;
this.value = value;
}

/** Returns the parent {@link OperationType} for this subtype. */
public OperationType getOperationType() {
return operationType;
}

/** Returns the wire-format string value sent in checkpoint updates. */
public String getValue() {
return value;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -410,11 +410,11 @@ protected void sendOperationUpdate(OperationUpdate.Builder builder) {

/** Sends an operation update asynchronously. */
protected CompletableFuture<Void> sendOperationUpdateAsync(OperationUpdate.Builder builder) {
var updateBuilder =
builder.id(getOperationId()).name(getName()).type(getType()).parentId(durableContext.getParentId());
if (getSubType() != null) {
updateBuilder.subType(getSubType().getValue());
}
var updateBuilder = builder.id(getOperationId())
.name(getName())
.type(getType())
.subType(getSubType().getValue())
.parentId(durableContext.getParentId());
var update = updateBuilder.build();
if (replayCompletedOperation.get()) {
// We are replaying a completed operation, so complete the completableFuture without checkpointing
Expand Down Expand Up @@ -444,9 +444,7 @@ protected void validateReplay(Operation checkpointed) {
getOperationId(), checkpointed.name(), getName())));
}

if ((getSubType() == null && checkpointed.subType() != null)
|| getSubType() != null
&& !Objects.equals(checkpointed.subType(), getSubType().getValue())) {
if (!Objects.equals(checkpointed.subType(), getSubType().getValue())) {
throw terminateExecution(new NonDeterministicExecutionException(String.format(
"Operation subType mismatch for \"%s\". Expected \"%s\", got \"%s\"",
getOperationId(), checkpointed.subType(), getSubType())));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,8 @@ private Throwable translateException(Operation op, ErrorObject errorObject) {
case RUN_IN_CHILD_CONTEXT, WITH_RETRY -> new ChildContextFailedException(op);

// the following subtypes should not be able to reach here
case PARALLEL, MAP, WAIT_FOR_CONDITION -> new IllegalStateException("Unexpected sub-type: " + getSubType());
case PARALLEL, MAP, WAIT_FOR_CONDITION, STEP, WAIT, CALLBACK, CHAINED_INVOKE ->
new IllegalStateException("Unexpected sub-type: " + getSubType());
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.lambda.model.OperationType;
import software.amazon.lambda.durable.DurableContext;
import software.amazon.lambda.durable.TypeToken;
import software.amazon.lambda.durable.config.NestingType;
Expand Down Expand Up @@ -116,7 +115,7 @@ protected <R> ChildContextOperation<R> createItem(
SerDes serDes,
OperationSubType branchSubType) {
return new ChildContextOperation<>(
OperationIdentifier.of(operationId, name, OperationType.CONTEXT, branchSubType),
OperationIdentifier.of(operationId, name, branchSubType),
function,
resultType,
RunInChildContextConfig.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import software.amazon.awssdk.services.lambda.model.Operation;
import software.amazon.awssdk.services.lambda.model.OperationAction;
import software.amazon.awssdk.services.lambda.model.OperationStatus;
import software.amazon.awssdk.services.lambda.model.OperationType;
import software.amazon.awssdk.services.lambda.model.OperationUpdate;
import software.amazon.awssdk.services.lambda.model.StepOptions;
import software.amazon.lambda.durable.StepContext;
Expand All @@ -21,7 +20,6 @@
import software.amazon.lambda.durable.execution.SuspendExecutionException;
import software.amazon.lambda.durable.execution.ThreadType;
import software.amazon.lambda.durable.model.OperationIdentifier;
import software.amazon.lambda.durable.model.OperationSubType;
import software.amazon.lambda.durable.model.WaitForConditionResult;
import software.amazon.lambda.durable.util.ExceptionHelper;

Expand All @@ -41,17 +39,12 @@ public class WaitForConditionOperation<T> extends SerializableDurableOperation<T
private final WaitForConditionConfig<T> config;

public WaitForConditionOperation(
String operationId,
String name,
OperationIdentifier operationIdentifier,
BiFunction<T, StepContext, WaitForConditionResult<T>> checkFunc,
TypeToken<T> resultTypeToken,
WaitForConditionConfig<T> config,
DurableContextImpl durableContext) {
super(
OperationIdentifier.of(operationId, name, OperationType.STEP, OperationSubType.WAIT_FOR_CONDITION),
resultTypeToken,
config.serDes(),
durableContext);
super(operationIdentifier, resultTypeToken, config.serDes(), durableContext);

this.checkFunc = checkFunc;
this.config = config;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import software.amazon.lambda.durable.execution.ThreadContext;
import software.amazon.lambda.durable.execution.ThreadType;
import software.amazon.lambda.durable.model.DurableExecutionInput;
import software.amazon.lambda.durable.model.OperationSubType;

class ReplayValidationTest {
private static final String EXECUTION_NAME = "exec-name";
Expand Down Expand Up @@ -67,6 +68,7 @@ void shouldPassValidationWhenStepTypeAndNameMatch() {
.id(OPERATION_ID1)
.name("test")
.type(OperationType.STEP)
.subType(OperationSubType.STEP.getValue())
.status(OperationStatus.SUCCEEDED)
.stepDetails(StepDetails.builder().result("\"result\"").build())
.build();
Expand All @@ -83,6 +85,7 @@ void shouldPassValidationWhenWaitTypeMatches() {
var existingOp = Operation.builder()
.id(OPERATION_ID1)
.type(OperationType.WAIT)
.subType(OperationSubType.WAIT.getValue())
.status(OperationStatus.SUCCEEDED)
.build();

Expand Down Expand Up @@ -144,6 +147,7 @@ void shouldHandleNullNamesCorrectly() {
.id(OPERATION_ID1)
.name(null)
.type(OperationType.STEP)
.subType(OperationSubType.STEP.getValue())
.status(OperationStatus.SUCCEEDED)
.stepDetails(StepDetails.builder().result("\"result\"").build())
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import software.amazon.lambda.durable.exception.UnrecoverableDurableExecutionException;
import software.amazon.lambda.durable.model.DurableExecutionInput;
import software.amazon.lambda.durable.model.ExecutionStatus;
import software.amazon.lambda.durable.model.OperationSubType;

class DurableExecutionTest {

Expand Down Expand Up @@ -187,6 +188,7 @@ void testExecuteReplay() {
.id(OPERATION_ID1)
.name("step1")
.type(OperationType.STEP)
.subType(OperationSubType.STEP.getValue())
.status(OperationStatus.SUCCEEDED)
.stepDetails(StepDetails.builder().result("\"First\"").build())
.build();
Expand Down
Loading