diff --git a/azure-blob-payloads/build.gradle b/azure-blob-payloads/build.gradle new file mode 100644 index 00000000..691d0931 --- /dev/null +++ b/azure-blob-payloads/build.gradle @@ -0,0 +1,88 @@ +/* + * Build file for the azure-blob-payloads module. + * Provides a BlobPayloadStore implementation for externalizing large payloads to Azure Blob Storage. + */ + +plugins { + id 'java-library' + id 'maven-publish' + id 'signing' + id 'com.github.spotbugs' version '6.4.8' +} + +group 'com.microsoft' +version = '1.7.0' +archivesBaseName = 'durabletask-azure-blob-payloads' + +def azureStorageBlobVersion = '12.30.0' +def azureCoreVersion = '1.57.1' +def azureIdentityVersion = '1.18.1' + +dependencies { + api project(':client') + implementation "com.azure:azure-storage-blob:${azureStorageBlobVersion}" + implementation "com.azure:azure-core:${azureCoreVersion}" + implementation "com.azure:azure-identity:${azureIdentityVersion}" + + testImplementation(platform('org.junit:junit-bom:5.14.2')) + testImplementation('org.junit.jupiter:junit-jupiter') + testRuntimeOnly('org.junit.platform:junit-platform-launcher') + testImplementation 'org.mockito:mockito-core:5.21.0' + testImplementation 'org.mockito:mockito-junit-jupiter:5.21.0' +} + +sourceCompatibility = JavaVersion.VERSION_1_8 +targetCompatibility = JavaVersion.VERSION_1_8 + +test { + useJUnitPlatform() +} + +publishing { + repositories { + maven { + url "file://$project.rootDir/repo" + } + } + publications { + mavenJava(MavenPublication) { + from components.java + artifactId = archivesBaseName + pom { + name = 'Durable Task Azure Blob Payloads for Java' + description = 'This package provides an Azure Blob Storage implementation of PayloadStore for externalizing large payloads in the Durable Task Java SDK.' + url = "https://github.com/microsoft/durabletask-java/tree/main/azure-blob-payloads" + licenses { + license { + name = "MIT License" + url = "https://opensource.org/licenses/MIT" + distribution = "repo" + } + } + developers { + developer { + id = "Microsoft" + name = "Microsoft Corporation" + } + } + scm { + connection = "scm:git:https://github.com/microsoft/durabletask-java" + developerConnection = "scm:git:git@github.com:microsoft/durabletask-java" + url = "https://github.com/microsoft/durabletask-java/tree/main/azure-blob-payloads" + } + } + } + } +} + +signing { + required { !project.hasProperty("skipSigning") && gradle.taskGraph.hasTask("publish") } + sign publishing.publications.mavenJava +} + +spotbugs { + ignoreFailures = false + effort = 'max' + reportLevel = 'medium' + excludeFilter = file('spotbugs-exclude.xml') +} diff --git a/azure-blob-payloads/spotbugs-exclude.xml b/azure-blob-payloads/spotbugs-exclude.xml new file mode 100644 index 00000000..d3221dc9 --- /dev/null +++ b/azure-blob-payloads/spotbugs-exclude.xml @@ -0,0 +1,11 @@ + + + + + + + + + + + diff --git a/azure-blob-payloads/src/main/java/com/microsoft/durabletask/azureblobpayloads/AzureBlobPayloadsExtensions.java b/azure-blob-payloads/src/main/java/com/microsoft/durabletask/azureblobpayloads/AzureBlobPayloadsExtensions.java new file mode 100644 index 00000000..df8fb841 --- /dev/null +++ b/azure-blob-payloads/src/main/java/com/microsoft/durabletask/azureblobpayloads/AzureBlobPayloadsExtensions.java @@ -0,0 +1,113 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package com.microsoft.durabletask.azureblobpayloads; + +import com.microsoft.durabletask.DurableTaskGrpcClientBuilder; +import com.microsoft.durabletask.DurableTaskGrpcWorkerBuilder; +import com.microsoft.durabletask.LargePayloadOptions; + +/** + * Extension methods for configuring Azure Blob Storage-based payload externalization + * on Durable Task builders. + *

+ * Example: + *

{@code
+ * BlobPayloadStoreOptions storeOptions = new BlobPayloadStoreOptions.Builder()
+ *     .setConnectionString("DefaultEndpointsProtocol=https;...")
+ *     .build();
+ *
+ * DurableTaskGrpcWorkerBuilder workerBuilder = new DurableTaskGrpcWorkerBuilder();
+ * AzureBlobPayloadsExtensions.useBlobStoragePayloads(workerBuilder, storeOptions);
+ *
+ * DurableTaskGrpcClientBuilder clientBuilder = new DurableTaskGrpcClientBuilder();
+ * AzureBlobPayloadsExtensions.useBlobStoragePayloads(clientBuilder, storeOptions);
+ * }
+ * + * @see BlobPayloadStore + * @see BlobPayloadStoreOptions + */ +public final class AzureBlobPayloadsExtensions { + + private AzureBlobPayloadsExtensions() { + } + + /** + * Configures the worker builder to use Azure Blob Storage for large payload externalization + * with default {@link LargePayloadOptions}. + * + * @param builder the worker builder to configure + * @param storeOptions the blob payload store configuration + */ + public static void useBlobStoragePayloads( + DurableTaskGrpcWorkerBuilder builder, + BlobPayloadStoreOptions storeOptions) { + if (builder == null) { + throw new IllegalArgumentException("builder must not be null"); + } + if (storeOptions == null) { + throw new IllegalArgumentException("storeOptions must not be null"); + } + builder.useExternalizedPayloads(new BlobPayloadStore(storeOptions)); + } + + /** + * Configures the worker builder to use Azure Blob Storage for large payload externalization + * with custom {@link LargePayloadOptions}. + * + * @param builder the worker builder to configure + * @param storeOptions the blob payload store configuration + * @param payloadOptions the large payload threshold options + */ + public static void useBlobStoragePayloads( + DurableTaskGrpcWorkerBuilder builder, + BlobPayloadStoreOptions storeOptions, + LargePayloadOptions payloadOptions) { + if (builder == null) { + throw new IllegalArgumentException("builder must not be null"); + } + if (storeOptions == null) { + throw new IllegalArgumentException("storeOptions must not be null"); + } + builder.useExternalizedPayloads(new BlobPayloadStore(storeOptions), payloadOptions); + } + + /** + * Configures the client builder to use Azure Blob Storage for large payload externalization + * with default {@link LargePayloadOptions}. + * + * @param builder the client builder to configure + * @param storeOptions the blob payload store configuration + */ + public static void useBlobStoragePayloads( + DurableTaskGrpcClientBuilder builder, + BlobPayloadStoreOptions storeOptions) { + if (builder == null) { + throw new IllegalArgumentException("builder must not be null"); + } + if (storeOptions == null) { + throw new IllegalArgumentException("storeOptions must not be null"); + } + builder.useExternalizedPayloads(new BlobPayloadStore(storeOptions)); + } + + /** + * Configures the client builder to use Azure Blob Storage for large payload externalization + * with custom {@link LargePayloadOptions}. + * + * @param builder the client builder to configure + * @param storeOptions the blob payload store configuration + * @param payloadOptions the large payload threshold options + */ + public static void useBlobStoragePayloads( + DurableTaskGrpcClientBuilder builder, + BlobPayloadStoreOptions storeOptions, + LargePayloadOptions payloadOptions) { + if (builder == null) { + throw new IllegalArgumentException("builder must not be null"); + } + if (storeOptions == null) { + throw new IllegalArgumentException("storeOptions must not be null"); + } + builder.useExternalizedPayloads(new BlobPayloadStore(storeOptions), payloadOptions); + } +} diff --git a/azure-blob-payloads/src/main/java/com/microsoft/durabletask/azureblobpayloads/BlobPayloadStore.java b/azure-blob-payloads/src/main/java/com/microsoft/durabletask/azureblobpayloads/BlobPayloadStore.java new file mode 100644 index 00000000..c6716a47 --- /dev/null +++ b/azure-blob-payloads/src/main/java/com/microsoft/durabletask/azureblobpayloads/BlobPayloadStore.java @@ -0,0 +1,230 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package com.microsoft.durabletask.azureblobpayloads; + +import com.azure.storage.blob.BlobClient; +import com.azure.storage.blob.BlobContainerClient; +import com.azure.storage.blob.BlobServiceClient; +import com.azure.storage.blob.BlobServiceClientBuilder; +import com.azure.storage.blob.models.BlobDownloadContentResponse; +import com.azure.storage.blob.models.BlobHttpHeaders; +import com.azure.storage.blob.models.BlobStorageException; +import com.azure.storage.blob.options.BlobParallelUploadOptions; +import com.microsoft.durabletask.PayloadStore; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.UUID; +import java.util.logging.Logger; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; + +/** + * Azure Blob Storage implementation of {@link PayloadStore}. + *

+ * This class uploads large payloads to Azure Blob Storage and returns tokens + * in the format {@code blob:v1::} that can be recognized + * and resolved by this store. + *

+ * The store automatically creates the container if it does not exist. + * Optionally compresses payloads with gzip (enabled by default). + * + * @see BlobPayloadStoreOptions + * @see PayloadStore + */ +public final class BlobPayloadStore implements PayloadStore { + + private static final Logger logger = Logger.getLogger(BlobPayloadStore.class.getName()); + private static final String BLOB_EXTENSION = ".json"; + private static final String TOKEN_PREFIX = "blob:v1:"; + private static final String GZIP_CONTENT_ENCODING = "gzip"; + + private final BlobContainerClient containerClient; + private final String blobPrefix; + private final String containerName; + private final boolean compressPayloads; + private volatile boolean containerEnsured; + + /** + * Creates a new BlobPayloadStore with the given options. + * + * @param options the blob payload store configuration + */ + public BlobPayloadStore(BlobPayloadStoreOptions options) { + if (options == null) { + throw new IllegalArgumentException("options must not be null"); + } + + BlobServiceClient serviceClient; + if (options.getBlobServiceClient() != null) { + serviceClient = options.getBlobServiceClient(); + } else if (options.getConnectionString() != null) { + serviceClient = new BlobServiceClientBuilder() + .connectionString(options.getConnectionString()) + .buildClient(); + } else { + serviceClient = new BlobServiceClientBuilder() + .endpoint(options.getBlobServiceEndpoint()) + .credential(options.getCredential()) + .buildClient(); + } + + this.containerClient = serviceClient.getBlobContainerClient(options.getContainerName()); + this.blobPrefix = options.getBlobPrefix(); + this.containerName = options.getContainerName(); + this.compressPayloads = options.isCompressPayloads(); + } + + @Override + public String upload(String payload) { + if (payload == null) { + throw new IllegalArgumentException("payload must not be null"); + } + + ensureContainerExists(); + + String blobName = this.blobPrefix + UUID.randomUUID().toString().replace("-", "") + BLOB_EXTENSION; + BlobClient blobClient = this.containerClient.getBlobClient(blobName); + + byte[] rawData = payload.getBytes(StandardCharsets.UTF_8); + byte[] data; + + if (this.compressPayloads) { + data = gzipCompress(rawData); + BlobHttpHeaders headers = new BlobHttpHeaders().setContentEncoding(GZIP_CONTENT_ENCODING); + BlobParallelUploadOptions uploadOptions = new BlobParallelUploadOptions( + new ByteArrayInputStream(data), data.length) + .setHeaders(headers); + blobClient.uploadWithResponse(uploadOptions, null, null); + } else { + data = rawData; + blobClient.upload(new ByteArrayInputStream(data), data.length, true); + } + + String token = TOKEN_PREFIX + this.containerName + ":" + blobName; + logger.fine(() -> String.format("Uploaded payload (%d bytes, compressed=%s) to %s", + rawData.length, this.compressPayloads, token)); + return token; + } + + @Override + public String download(String token) { + if (token == null || token.isEmpty()) { + throw new IllegalArgumentException("token must not be null or empty"); + } + + String blobName = extractBlobName(token); + BlobClient blobClient = this.containerClient.getBlobClient(blobName); + + BlobDownloadContentResponse response = blobClient.downloadContentWithResponse(null, null, null, null); + byte[] rawBytes = response.getValue().toBytes(); + String contentEncoding = response.getDeserializedHeaders().getContentEncoding(); + if (GZIP_CONTENT_ENCODING.equalsIgnoreCase(contentEncoding)) { + rawBytes = gzipDecompress(rawBytes); + } + + String payload = new String(rawBytes, StandardCharsets.UTF_8); + logger.fine(() -> String.format("Downloaded payload (%d bytes) from %s", payload.length(), token)); + return payload; + } + + @Override + public boolean isKnownPayloadToken(String value) { + if (value == null || value.isEmpty()) { + return false; + } + return value.startsWith(TOKEN_PREFIX); + } + + /** + * Ensures the blob container exists, creating it if necessary. + * The check-then-act on {@code containerEnsured} is intentionally non-atomic: + * concurrent callers may race through to {@code create()}, but the 409 Conflict + * handler makes this benign. + */ + private void ensureContainerExists() { + if (this.containerEnsured) { + return; + } + try { + if (!this.containerClient.exists()) { + this.containerClient.create(); + logger.info(() -> String.format("Created blob container: %s", this.containerClient.getBlobContainerName())); + } + this.containerEnsured = true; + } catch (BlobStorageException e) { + // Container might have been created concurrently (409 Conflict) + if (e.getStatusCode() != 409) { + throw e; + } + this.containerEnsured = true; + } + } + + /** + * Extracts the blob name from a {@code blob:v1::} token + * and validates that the container matches the configured container. + */ + private String extractBlobName(String token) { + if (!token.startsWith(TOKEN_PREFIX)) { + throw new IllegalArgumentException( + "Token does not have the expected format (blob:v1:...): " + token); + } + // Format: blob:v1:: + String remainder = token.substring(TOKEN_PREFIX.length()); + int colonIndex = remainder.indexOf(':'); + if (colonIndex < 0) { + throw new IllegalArgumentException( + "Token does not have the expected format (blob:v1::): " + token); + } + String tokenContainer = remainder.substring(0, colonIndex); + if (!this.containerName.equals(tokenContainer)) { + throw new IllegalArgumentException(String.format( + "Token container '%s' does not match configured container '%s'", + tokenContainer, this.containerName)); + } + return remainder.substring(colonIndex + 1); + } + + private static byte[] gzipCompress(byte[] data) { + try { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try (GZIPOutputStream gzipOut = new GZIPOutputStream(baos)) { + gzipOut.write(data); + } + return baos.toByteArray(); + } catch (IOException e) { + throw new RuntimeException("Failed to gzip compress payload", e); + } + } + + /** + * Maximum decompressed size (20 MiB) to guard against decompression bombs. + * This is 2x the default max externalized payload size of 10 MiB. + */ + private static final int MAX_DECOMPRESSED_BYTES = 20 * 1024 * 1024; + + private static byte[] gzipDecompress(byte[] compressed) { + try { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try (GZIPInputStream gzipIn = new GZIPInputStream(new ByteArrayInputStream(compressed))) { + byte[] buffer = new byte[8192]; + int len; + int totalRead = 0; + while ((len = gzipIn.read(buffer)) != -1) { + totalRead += len; + if (totalRead > MAX_DECOMPRESSED_BYTES) { + throw new IOException( + "Decompressed payload exceeds safety limit of " + MAX_DECOMPRESSED_BYTES + " bytes"); + } + baos.write(buffer, 0, len); + } + } + return baos.toByteArray(); + } catch (IOException e) { + throw new RuntimeException("Failed to gzip decompress payload", e); + } + } +} diff --git a/azure-blob-payloads/src/main/java/com/microsoft/durabletask/azureblobpayloads/BlobPayloadStoreOptions.java b/azure-blob-payloads/src/main/java/com/microsoft/durabletask/azureblobpayloads/BlobPayloadStoreOptions.java new file mode 100644 index 00000000..5b5246d7 --- /dev/null +++ b/azure-blob-payloads/src/main/java/com/microsoft/durabletask/azureblobpayloads/BlobPayloadStoreOptions.java @@ -0,0 +1,276 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package com.microsoft.durabletask.azureblobpayloads; + +import com.azure.core.credential.TokenCredential; +import com.azure.storage.blob.BlobServiceClient; + +/** + * Configuration options for {@link BlobPayloadStore}. + *

+ * Use the {@link Builder} to construct instances. Either a connection string or a + * blob service endpoint with a credential must be provided. + *

+ * Example using connection string: + *

{@code
+ * BlobPayloadStoreOptions options = new BlobPayloadStoreOptions.Builder()
+ *     .setConnectionString("DefaultEndpointsProtocol=https;...")
+ *     .setContainerName("large-payloads")
+ *     .build();
+ * }
+ *

+ * Example using endpoint with managed identity: + *

{@code
+ * BlobPayloadStoreOptions options = new BlobPayloadStoreOptions.Builder()
+ *     .setBlobServiceEndpoint("https://myaccount.blob.core.windows.net")
+ *     .setCredential(new DefaultAzureCredentialBuilder().build())
+ *     .setContainerName("large-payloads")
+ *     .build();
+ * }
+ * + * @see BlobPayloadStore + */ +public final class BlobPayloadStoreOptions { + + /** + * Default container name for storing externalized payloads. + */ + static final String DEFAULT_CONTAINER_NAME = "durabletask-payloads"; + + /** + * Default prefix for blob names. + */ + static final String DEFAULT_BLOB_PREFIX = "payloads/"; + + private final String connectionString; + private final String blobServiceEndpoint; + private final TokenCredential credential; + private final BlobServiceClient blobServiceClient; + private final String containerName; + private final String blobPrefix; + private final boolean compressPayloads; + + private BlobPayloadStoreOptions(Builder builder) { + this.connectionString = builder.connectionString; + this.blobServiceEndpoint = builder.blobServiceEndpoint; + this.credential = builder.credential; + this.blobServiceClient = builder.blobServiceClient; + this.containerName = builder.containerName; + this.blobPrefix = builder.blobPrefix; + this.compressPayloads = builder.compressPayloads; + } + + /** + * Gets the Azure Storage connection string, if configured. + * + * @return the connection string, or null if endpoint-based auth is used + */ + public String getConnectionString() { + return this.connectionString; + } + + /** + * Gets the Azure Blob Storage service endpoint, if configured. + * + * @return the blob service endpoint, or null if connection string auth is used + */ + public String getBlobServiceEndpoint() { + return this.blobServiceEndpoint; + } + + /** + * Gets the token credential for authenticating to Azure Blob Storage, if configured. + * + * @return the token credential, or null + */ + public TokenCredential getCredential() { + return this.credential; + } + + /** + * Gets a pre-configured BlobServiceClient, if provided. + * + * @return the blob service client, or null + */ + public BlobServiceClient getBlobServiceClient() { + return this.blobServiceClient; + } + + /** + * Gets the container name for storing externalized payloads. + * + * @return the container name + */ + public String getContainerName() { + return this.containerName; + } + + /** + * Gets the blob name prefix for externalized payloads. + * + * @return the blob prefix + */ + public String getBlobPrefix() { + return this.blobPrefix; + } + + /** + * Gets whether payloads should be compressed with gzip before uploading. + * + * @return true if compression is enabled, false otherwise + */ + public boolean isCompressPayloads() { + return this.compressPayloads; + } + + /** + * Builder for constructing {@link BlobPayloadStoreOptions} instances. + */ + public static final class Builder { + private String connectionString; + private String blobServiceEndpoint; + private TokenCredential credential; + private BlobServiceClient blobServiceClient; + private String containerName = DEFAULT_CONTAINER_NAME; + private String blobPrefix = DEFAULT_BLOB_PREFIX; + private boolean compressPayloads = true; + + /** + * Sets the Azure Storage connection string. Mutually exclusive with + * {@link #setBlobServiceEndpoint(String)} and {@link #setBlobServiceClient(BlobServiceClient)}. + * Setting this clears any previously set endpoint or pre-configured client. + * + * @param connectionString the connection string + * @return this builder + */ + public Builder setConnectionString(String connectionString) { + if (connectionString == null || connectionString.isEmpty()) { + throw new IllegalArgumentException("connectionString must not be null or empty"); + } + this.connectionString = connectionString; + this.blobServiceEndpoint = null; + this.credential = null; + this.blobServiceClient = null; + return this; + } + + /** + * Sets the Azure Blob Storage service endpoint. Use with {@link #setCredential(TokenCredential)}. + * Mutually exclusive with {@link #setConnectionString(String)} and {@link #setBlobServiceClient(BlobServiceClient)}. + * Setting this clears any previously set connection string or pre-configured client. + * + * @param blobServiceEndpoint the blob service endpoint URL + * @return this builder + */ + public Builder setBlobServiceEndpoint(String blobServiceEndpoint) { + if (blobServiceEndpoint == null || blobServiceEndpoint.isEmpty()) { + throw new IllegalArgumentException("blobServiceEndpoint must not be null or empty"); + } + this.blobServiceEndpoint = blobServiceEndpoint; + this.connectionString = null; + this.blobServiceClient = null; + return this; + } + + /** + * Sets the token credential for authenticating to Azure Blob Storage. + * Used with {@link #setBlobServiceEndpoint(String)}. + * + * @param credential the token credential + * @return this builder + */ + public Builder setCredential(TokenCredential credential) { + if (credential == null) { + throw new IllegalArgumentException("credential must not be null"); + } + this.credential = credential; + return this; + } + + /** + * Sets a pre-configured BlobServiceClient. Mutually exclusive with + * {@link #setConnectionString(String)} and {@link #setBlobServiceEndpoint(String)}. + * Setting this clears any previously set connection string or endpoint. + * + * @param blobServiceClient the pre-configured client + * @return this builder + */ + public Builder setBlobServiceClient(BlobServiceClient blobServiceClient) { + if (blobServiceClient == null) { + throw new IllegalArgumentException("blobServiceClient must not be null"); + } + this.blobServiceClient = blobServiceClient; + this.connectionString = null; + this.blobServiceEndpoint = null; + this.credential = null; + return this; + } + + /** + * Sets the container name. Defaults to {@value DEFAULT_CONTAINER_NAME}. + * + * @param containerName the container name + * @return this builder + */ + public Builder setContainerName(String containerName) { + if (containerName == null || containerName.isEmpty()) { + throw new IllegalArgumentException("containerName must not be null or empty"); + } + this.containerName = containerName; + return this; + } + + /** + * Sets the blob name prefix. Defaults to {@value DEFAULT_BLOB_PREFIX}. + * + * @param blobPrefix the blob name prefix + * @return this builder + */ + public Builder setBlobPrefix(String blobPrefix) { + if (blobPrefix == null) { + throw new IllegalArgumentException("blobPrefix must not be null"); + } + this.blobPrefix = blobPrefix; + return this; + } + + /** + * Sets whether payloads should be compressed with gzip before uploading. + * Defaults to {@code true}. + * + * @param compressPayloads true to enable gzip compression + * @return this builder + */ + public Builder setCompressPayloads(boolean compressPayloads) { + this.compressPayloads = compressPayloads; + return this; + } + + /** + * Builds a new {@link BlobPayloadStoreOptions} instance. + * + * @return the options instance + * @throws IllegalStateException if no authentication method is configured + */ + public BlobPayloadStoreOptions build() { + int authMethods = 0; + if (connectionString != null) authMethods++; + if (blobServiceEndpoint != null) authMethods++; + if (blobServiceClient != null) authMethods++; + + if (authMethods == 0) { + throw new IllegalStateException( + "One of connectionString, blobServiceEndpoint, or blobServiceClient must be set"); + } + if (authMethods > 1) { + throw new IllegalStateException( + "Only one of connectionString, blobServiceEndpoint, or blobServiceClient may be set"); + } + if (blobServiceEndpoint != null && credential == null) { + throw new IllegalStateException( + "credential must be set when using blobServiceEndpoint"); + } + return new BlobPayloadStoreOptions(this); + } + } +} diff --git a/azure-blob-payloads/src/main/java/com/microsoft/durabletask/azureblobpayloads/BlobPayloadStoreProvider.java b/azure-blob-payloads/src/main/java/com/microsoft/durabletask/azureblobpayloads/BlobPayloadStoreProvider.java new file mode 100644 index 00000000..92bd6588 --- /dev/null +++ b/azure-blob-payloads/src/main/java/com/microsoft/durabletask/azureblobpayloads/BlobPayloadStoreProvider.java @@ -0,0 +1,41 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package com.microsoft.durabletask.azureblobpayloads; + +import com.microsoft.durabletask.PayloadStore; +import com.microsoft.durabletask.PayloadStoreProvider; + +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * {@link PayloadStoreProvider} implementation that creates a {@link BlobPayloadStore} + * when the {@code DURABLETASK_LARGE_PAYLOADS_CONNECTION_STRING} environment variable is set. + *

+ * This provider is discovered automatically via {@link java.util.ServiceLoader}. + */ +public final class BlobPayloadStoreProvider implements PayloadStoreProvider { + + private static final Logger logger = Logger.getLogger(BlobPayloadStoreProvider.class.getName()); + private static final String ENV_STORAGE_CONNECTION_STRING = "DURABLETASK_LARGE_PAYLOADS_CONNECTION_STRING"; + + @Override + public PayloadStore create() { + String connectionString = System.getenv(ENV_STORAGE_CONNECTION_STRING); + if (connectionString == null || connectionString.isEmpty()) { + return null; + } + + try { + BlobPayloadStoreOptions options = new BlobPayloadStoreOptions.Builder() + .setConnectionString(connectionString) + .build(); + logger.info("Large payload externalization enabled using Azure Blob Storage"); + return new BlobPayloadStore(options); + } catch (Exception e) { + logger.log(Level.WARNING, + "Failed to initialize BlobPayloadStore; large payloads will not be externalized", e); + return null; + } + } +} diff --git a/azure-blob-payloads/src/main/resources/META-INF/services/com.microsoft.durabletask.PayloadStoreProvider b/azure-blob-payloads/src/main/resources/META-INF/services/com.microsoft.durabletask.PayloadStoreProvider new file mode 100644 index 00000000..573fb7ff --- /dev/null +++ b/azure-blob-payloads/src/main/resources/META-INF/services/com.microsoft.durabletask.PayloadStoreProvider @@ -0,0 +1 @@ +com.microsoft.durabletask.azureblobpayloads.BlobPayloadStoreProvider diff --git a/azure-blob-payloads/src/test/java/com/microsoft/durabletask/azureblobpayloads/BlobPayloadStoreOptionsTest.java b/azure-blob-payloads/src/test/java/com/microsoft/durabletask/azureblobpayloads/BlobPayloadStoreOptionsTest.java new file mode 100644 index 00000000..dba0e1ca --- /dev/null +++ b/azure-blob-payloads/src/test/java/com/microsoft/durabletask/azureblobpayloads/BlobPayloadStoreOptionsTest.java @@ -0,0 +1,94 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.microsoft.durabletask.azureblobpayloads; + +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.*; + +/** + * Unit tests for BlobPayloadStoreOptions. + */ +public class BlobPayloadStoreOptionsTest { + + @Test + void connectionString_setsCorrectly() { + BlobPayloadStoreOptions options = new BlobPayloadStoreOptions.Builder() + .setConnectionString("DefaultEndpointsProtocol=https;AccountName=test;AccountKey=key;EndpointSuffix=core.windows.net") + .build(); + assertNotNull(options.getConnectionString()); + assertEquals("durabletask-payloads", options.getContainerName()); + assertEquals("payloads/", options.getBlobPrefix()); + assertTrue(options.isCompressPayloads()); + } + + @Test + void customContainerAndPrefix() { + BlobPayloadStoreOptions options = new BlobPayloadStoreOptions.Builder() + .setConnectionString("DefaultEndpointsProtocol=https;AccountName=test;AccountKey=key;EndpointSuffix=core.windows.net") + .setContainerName("my-container") + .setBlobPrefix("my-prefix/") + .build(); + assertEquals("my-container", options.getContainerName()); + assertEquals("my-prefix/", options.getBlobPrefix()); + } + + @Test + void noAuthMethod_throws() { + assertThrows(IllegalStateException.class, + () -> new BlobPayloadStoreOptions.Builder().build()); + } + + @Test + void multipleAuthMethods_throws() { + assertThrows(IllegalStateException.class, + () -> new BlobPayloadStoreOptions.Builder() + .setConnectionString("conn-string") + .setBlobServiceEndpoint("https://test.blob.core.windows.net") + .build()); + } + + @Test + void endpointWithoutCredential_throws() { + assertThrows(IllegalStateException.class, + () -> new BlobPayloadStoreOptions.Builder() + .setBlobServiceEndpoint("https://test.blob.core.windows.net") + .build()); + } + + @Test + void nullConnectionString_throws() { + assertThrows(IllegalArgumentException.class, + () -> new BlobPayloadStoreOptions.Builder().setConnectionString(null)); + } + + @Test + void emptyContainerName_throws() { + assertThrows(IllegalArgumentException.class, + () -> new BlobPayloadStoreOptions.Builder().setContainerName("")); + } + + @Test + void nullBlobPrefix_throws() { + assertThrows(IllegalArgumentException.class, + () -> new BlobPayloadStoreOptions.Builder().setBlobPrefix(null)); + } + + @Test + void compressPayloads_defaultsToTrue() { + BlobPayloadStoreOptions options = new BlobPayloadStoreOptions.Builder() + .setConnectionString("DefaultEndpointsProtocol=https;AccountName=test;AccountKey=key;EndpointSuffix=core.windows.net") + .build(); + assertTrue(options.isCompressPayloads()); + } + + @Test + void compressPayloads_canBeDisabled() { + BlobPayloadStoreOptions options = new BlobPayloadStoreOptions.Builder() + .setConnectionString("DefaultEndpointsProtocol=https;AccountName=test;AccountKey=key;EndpointSuffix=core.windows.net") + .setCompressPayloads(false) + .build(); + assertFalse(options.isCompressPayloads()); + } +} diff --git a/azurefunctions/src/main/java/com/microsoft/durabletask/azurefunctions/internal/middleware/OrchestrationMiddleware.java b/azurefunctions/src/main/java/com/microsoft/durabletask/azurefunctions/internal/middleware/OrchestrationMiddleware.java index 02be55f4..f1ed2d68 100644 --- a/azurefunctions/src/main/java/com/microsoft/durabletask/azurefunctions/internal/middleware/OrchestrationMiddleware.java +++ b/azurefunctions/src/main/java/com/microsoft/durabletask/azurefunctions/internal/middleware/OrchestrationMiddleware.java @@ -12,10 +12,16 @@ import com.microsoft.durabletask.CompositeTaskFailedException; import com.microsoft.durabletask.DataConverter; import com.microsoft.durabletask.OrchestrationRunner; +import com.microsoft.durabletask.PayloadStore; +import com.microsoft.durabletask.PayloadStoreProvider; import com.microsoft.durabletask.TaskFailedException; import com.microsoft.durabletask.interruption.ContinueAsNewInterruption; import com.microsoft.durabletask.interruption.OrchestratorBlockedException; +import java.util.ServiceLoader; +import java.util.logging.Level; +import java.util.logging.Logger; + /** * Durable Function Orchestration Middleware * @@ -25,6 +31,13 @@ public class OrchestrationMiddleware implements Middleware { private static final String ORCHESTRATION_TRIGGER = "DurableOrchestrationTrigger"; + private static final Logger logger = Logger.getLogger(OrchestrationMiddleware.class.getName()); + + private final PayloadStore payloadStore; + + public OrchestrationMiddleware() { + this.payloadStore = initializePayloadStore(); + } @Override public void invoke(MiddlewareContext context, MiddlewareChain chain) throws Exception { @@ -70,7 +83,24 @@ public void invoke(MiddlewareContext context, MiddlewareChain chain) throws Exce // requires update on OrchestratorFunction API. throw new RuntimeException("Unexpected failure in the task execution", e); } - }); + }, this.payloadStore); context.updateReturnValue(orchestratorOutputEncodedProtoBytes); } + + private static PayloadStore initializePayloadStore() { + ServiceLoader loader = ServiceLoader.load(PayloadStoreProvider.class); + for (PayloadStoreProvider provider : loader) { + try { + PayloadStore store = provider.create(); + if (store != null) { + return store; + } + } catch (Exception e) { + logger.log(Level.WARNING, + "PayloadStoreProvider " + provider.getClass().getName() + " failed to create store", e); + } + } + logger.fine("No PayloadStoreProvider found or configured; large payload externalization is disabled"); + return null; + } } diff --git a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java index 14955120..b3a86c95 100644 --- a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java +++ b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java @@ -32,10 +32,14 @@ public final class DurableTaskGrpcClient extends DurableTaskClient { private final ManagedChannel managedSidecarChannel; private final TaskHubSidecarServiceBlockingStub sidecarClient; private final String defaultVersion; + private final PayloadHelper payloadHelper; DurableTaskGrpcClient(DurableTaskGrpcClientBuilder builder) { this.dataConverter = builder.dataConverter != null ? builder.dataConverter : new JacksonDataConverter(); this.defaultVersion = builder.defaultVersion; + this.payloadHelper = builder.payloadStore != null + ? new PayloadHelper(builder.payloadStore, builder.largePayloadOptions) + : null; Channel sidecarGrpcChannel; if (builder.channel != null) { @@ -63,6 +67,7 @@ public final class DurableTaskGrpcClient extends DurableTaskClient { DurableTaskGrpcClient(int port, String defaultVersion) { this.dataConverter = new JacksonDataConverter(); this.defaultVersion = defaultVersion; + this.payloadHelper = null; // Need to keep track of this channel so we can dispose it on close() this.managedSidecarChannel = ManagedChannelBuilder @@ -125,6 +130,9 @@ public String scheduleNewOrchestrationInstance( Object input = options.getInput(); if (input != null) { String serializedInput = this.dataConverter.serialize(input); + if (this.payloadHelper != null) { + serializedInput = this.payloadHelper.maybeExternalize(serializedInput); + } builder.setInput(StringValue.of(serializedInput)); } @@ -177,6 +185,9 @@ public void raiseEvent(String instanceId, String eventName, Object eventPayload) .setName(eventName); if (eventPayload != null) { String serializedPayload = this.dataConverter.serialize(eventPayload); + if (this.payloadHelper != null) { + serializedPayload = this.payloadHelper.maybeExternalize(serializedPayload); + } builder.setInput(StringValue.of(serializedPayload)); } @@ -191,6 +202,9 @@ public OrchestrationMetadata getInstanceMetadata(String instanceId, boolean getI .setGetInputsAndOutputs(getInputsAndOutputs) .build(); GetInstanceResponse response = this.sidecarClient.getInstance(request); + if (this.payloadHelper != null) { + response = resolveGetInstanceResponsePayloads(response); + } return new OrchestrationMetadata(response, this.dataConverter, request.getGetInputsAndOutputs()); } @@ -218,6 +232,9 @@ public OrchestrationMetadata waitForInstanceStart(String instanceId, Duration ti } throw e; } + if (this.payloadHelper != null) { + response = resolveGetInstanceResponsePayloads(response); + } return new OrchestrationMetadata(response, this.dataConverter, request.getGetInputsAndOutputs()); } @@ -245,6 +262,9 @@ public OrchestrationMetadata waitForInstanceCompletion(String instanceId, Durati } throw e; } + if (this.payloadHelper != null) { + response = resolveGetInstanceResponsePayloads(response); + } return new OrchestrationMetadata(response, this.dataConverter, request.getGetInputsAndOutputs()); } @@ -258,7 +278,11 @@ public void terminate(String instanceId, @Nullable Object output) { serializeOutput != null ? serializeOutput : "(null)")); TerminateRequest.Builder builder = TerminateRequest.newBuilder().setInstanceId(instanceId); if (serializeOutput != null){ - builder.setOutput(StringValue.of(serializeOutput)); + String outputToSend = serializeOutput; + if (this.payloadHelper != null) { + outputToSend = this.payloadHelper.maybeExternalize(outputToSend); + } + builder.setOutput(StringValue.of(outputToSend)); } this.sidecarClient.terminateInstance(builder.build()); } @@ -281,7 +305,11 @@ public OrchestrationStatusQueryResult queryInstances(OrchestrationStatusQuery qu private OrchestrationStatusQueryResult toQueryResult(QueryInstancesResponse queryInstancesResponse, boolean fetchInputsAndOutputs){ List metadataList = new ArrayList<>(); queryInstancesResponse.getOrchestrationStateList().forEach(state -> { - metadataList.add(new OrchestrationMetadata(state, this.dataConverter, fetchInputsAndOutputs)); + OrchestrationState resolvedState = state; + if (this.payloadHelper != null) { + resolvedState = resolveOrchestrationStatePayloads(state); + } + metadataList.add(new OrchestrationMetadata(resolvedState, this.dataConverter, fetchInputsAndOutputs)); }); return new OrchestrationStatusQueryResult(metadataList, queryInstancesResponse.getContinuationToken().getValue()); } @@ -340,7 +368,11 @@ public void suspendInstance(String instanceId, @Nullable String reason) { SuspendRequest.Builder suspendRequestBuilder = SuspendRequest.newBuilder(); suspendRequestBuilder.setInstanceId(instanceId); if (reason != null) { - suspendRequestBuilder.setReason(StringValue.of(reason)); + String reasonToSend = reason; + if (this.payloadHelper != null) { + reasonToSend = this.payloadHelper.maybeExternalize(reasonToSend); + } + suspendRequestBuilder.setReason(StringValue.of(reasonToSend)); } this.sidecarClient.suspendInstance(suspendRequestBuilder.build()); } @@ -350,7 +382,11 @@ public void resumeInstance(String instanceId, @Nullable String reason) { ResumeRequest.Builder resumeRequestBuilder = ResumeRequest.newBuilder(); resumeRequestBuilder.setInstanceId(instanceId); if (reason != null) { - resumeRequestBuilder.setReason(StringValue.of(reason)); + String reasonToSend = reason; + if (this.payloadHelper != null) { + reasonToSend = this.payloadHelper.maybeExternalize(reasonToSend); + } + resumeRequestBuilder.setReason(StringValue.of(reasonToSend)); } this.sidecarClient.resumeInstance(resumeRequestBuilder.build()); } @@ -399,4 +435,53 @@ public String restartInstance(String instanceId, boolean restartWithNewInstanceI private PurgeResult toPurgeResult(PurgeInstancesResponse response){ return new PurgeResult(response.getDeletedInstanceCount()); } + + /** + * Resolves externalized payload URI tokens in a GetInstanceResponse. + */ + private GetInstanceResponse resolveGetInstanceResponsePayloads(GetInstanceResponse response) { + if (!response.getExists()) { + return response; + } + OrchestrationState state = response.getOrchestrationState(); + OrchestrationState resolvedState = resolveOrchestrationStatePayloads(state); + if (resolvedState == state) { + return response; + } + return response.toBuilder().setOrchestrationState(resolvedState).build(); + } + + /** + * Resolves externalized payload URI tokens in an OrchestrationState. + */ + private OrchestrationState resolveOrchestrationStatePayloads(OrchestrationState state) { + boolean changed = false; + OrchestrationState.Builder builder = state.toBuilder(); + + if (state.hasInput() && !state.getInput().getValue().isEmpty()) { + String resolved = this.payloadHelper.maybeResolve(state.getInput().getValue()); + if (!resolved.equals(state.getInput().getValue())) { + builder.setInput(StringValue.of(resolved)); + changed = true; + } + } + + if (state.hasOutput() && !state.getOutput().getValue().isEmpty()) { + String resolved = this.payloadHelper.maybeResolve(state.getOutput().getValue()); + if (!resolved.equals(state.getOutput().getValue())) { + builder.setOutput(StringValue.of(resolved)); + changed = true; + } + } + + if (state.hasCustomStatus() && !state.getCustomStatus().getValue().isEmpty()) { + String resolved = this.payloadHelper.maybeResolve(state.getCustomStatus().getValue()); + if (!resolved.equals(state.getCustomStatus().getValue())) { + builder.setCustomStatus(StringValue.of(resolved)); + changed = true; + } + } + + return changed ? builder.build() : state; + } } diff --git a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClientBuilder.java b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClientBuilder.java index 1a1cb6f2..97b49919 100644 --- a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClientBuilder.java +++ b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClientBuilder.java @@ -13,6 +13,8 @@ public final class DurableTaskGrpcClientBuilder { int port; Channel channel; String defaultVersion; + PayloadStore payloadStore; + LargePayloadOptions largePayloadOptions; /** * Sets the {@link DataConverter} to use for converting serializable data payloads. @@ -65,6 +67,41 @@ public DurableTaskGrpcClientBuilder defaultVersion(String defaultVersion) { return this; } + /** + * Enables large payload externalization with default options. + *

+ * When enabled, payloads exceeding the default threshold will be uploaded to the + * provided {@link PayloadStore} and replaced with opaque token references. + * + * @param payloadStore the store to use for externalizing large payloads + * @return this builder object + */ + public DurableTaskGrpcClientBuilder useExternalizedPayloads(PayloadStore payloadStore) { + return this.useExternalizedPayloads(payloadStore, new LargePayloadOptions.Builder().build()); + } + + /** + * Enables large payload externalization with custom options. + *

+ * When enabled, payloads exceeding the configured threshold will be uploaded to the + * provided {@link PayloadStore} and replaced with opaque token references. + * + * @param payloadStore the store to use for externalizing large payloads + * @param options the large payload configuration options + * @return this builder object + */ + public DurableTaskGrpcClientBuilder useExternalizedPayloads(PayloadStore payloadStore, LargePayloadOptions options) { + if (payloadStore == null) { + throw new IllegalArgumentException("payloadStore must not be null"); + } + if (options == null) { + throw new IllegalArgumentException("options must not be null"); + } + this.payloadStore = payloadStore; + this.largePayloadOptions = options; + return this; + } + /** * Initializes a new {@link DurableTaskClient} object with the settings specified in the current builder object. * @return a new {@link DurableTaskClient} object diff --git a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java index 284b7090..48b2f7d8 100644 --- a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java +++ b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java @@ -39,6 +39,8 @@ public final class DurableTaskGrpcWorker implements AutoCloseable { private final DataConverter dataConverter; private final Duration maximumTimerInterval; private final DurableTaskGrpcWorkerVersioningOptions versioningOptions; + private final PayloadHelper payloadHelper; + private final int chunkSizeBytes; private final TaskHubSidecarServiceBlockingStub sidecarClient; @@ -70,6 +72,10 @@ public final class DurableTaskGrpcWorker implements AutoCloseable { this.dataConverter = builder.dataConverter != null ? builder.dataConverter : new JacksonDataConverter(); this.maximumTimerInterval = builder.maximumTimerInterval != null ? builder.maximumTimerInterval : DEFAULT_MAXIMUM_TIMER_INTERVAL; this.versioningOptions = builder.versioningOptions; + this.payloadHelper = builder.payloadStore != null + ? new PayloadHelper(builder.payloadStore, builder.largePayloadOptions) + : null; + this.chunkSizeBytes = builder.chunkSizeBytes; } /** @@ -132,7 +138,11 @@ public void startAndBlock() { // TODO: How do we interrupt manually? while (true) { try { - GetWorkItemsRequest getWorkItemsRequest = GetWorkItemsRequest.newBuilder().build(); + GetWorkItemsRequest.Builder getWorkItemsRequestBuilder = GetWorkItemsRequest.newBuilder(); + if (this.payloadHelper != null) { + getWorkItemsRequestBuilder.addCapabilities(WorkerCapability.WORKER_CAPABILITY_LARGE_PAYLOADS); + } + GetWorkItemsRequest getWorkItemsRequest = getWorkItemsRequestBuilder.build(); Iterator workItemStream = this.sidecarClient.getWorkItems(getWorkItemsRequest); while (workItemStream.hasNext()) { WorkItem workItem = workItemStream.next(); @@ -140,6 +150,11 @@ public void startAndBlock() { if (requestType == RequestCase.ORCHESTRATORREQUEST) { OrchestratorRequest orchestratorRequest = workItem.getOrchestratorRequest(); + // Resolve externalized payload URI tokens in history events + if (this.payloadHelper != null) { + orchestratorRequest = resolveOrchestratorRequestPayloads(orchestratorRequest); + } + // If versioning is set, process it first to see if the orchestration should be executed. boolean versioningFailed = false; if (versioningOptions != null && versioningOptions.getVersion() != null) { @@ -289,7 +304,34 @@ public void startAndBlock() { .setCompletionToken(workItem.getCompletionToken()) .build(); - this.sidecarClient.completeOrchestratorTask(response); + // Externalize large payloads and send (with optional chunking). + // If externalization or chunking fails, report as orchestration failure. + try { + if (this.payloadHelper != null) { + response = externalizeOrchestratorResponsePayloads(response); + } + sendOrchestratorResponse(response); + } catch (PayloadTooLargeException e) { + logger.log(Level.WARNING, + "Failed to send orchestrator response for instance '" + + orchestratorRequest.getInstanceId() + "': " + e.getMessage(), e); + CompleteOrchestrationAction failAction = CompleteOrchestrationAction.newBuilder() + .setOrchestrationStatus(OrchestrationStatus.ORCHESTRATION_STATUS_FAILED) + .setFailureDetails(TaskFailureDetails.newBuilder() + .setErrorType(e.getClass().getName()) + .setErrorMessage(e.getMessage()) + .setStackTrace(StringValue.of(FailureDetails.getFullStackTrace(e))) + .build()) + .build(); + OrchestratorResponse failResponse = OrchestratorResponse.newBuilder() + .setInstanceId(orchestratorRequest.getInstanceId()) + .setCompletionToken(workItem.getCompletionToken()) + .addActions(OrchestratorAction.newBuilder() + .setCompleteOrchestration(failAction) + .build()) + .build(); + this.sidecarClient.completeOrchestratorTask(failResponse); + } } else { switch(versioningOptions.getFailureStrategy()) { case FAIL: @@ -311,7 +353,7 @@ public void startAndBlock() { .addActions(action) .build(); - this.sidecarClient.completeOrchestratorTask(response); + sendOrchestratorResponse(response); break; // Reject and default share the same behavior as it does not change the orchestration to a terminal state. case REJECT: @@ -323,6 +365,12 @@ public void startAndBlock() { } } else if (requestType == RequestCase.ACTIVITYREQUEST) { ActivityRequest activityRequest = workItem.getActivityRequest(); + + // Resolve externalized payload URI token in activity input + if (this.payloadHelper != null) { + activityRequest = resolveActivityRequestPayloads(activityRequest); + } + String activityInstanceId = activityRequest.getOrchestrationInstance().getInstanceId(); // Start a tracing span for this activity execution @@ -367,6 +415,10 @@ public void startAndBlock() { .setCompletionToken(workItem.getCompletionToken()); if (output != null) { + // Externalize activity output if it exceeds threshold + if (this.payloadHelper != null) { + output = this.payloadHelper.maybeExternalize(output); + } responseBuilder.setResult(StringValue.of(output)); } @@ -408,4 +460,101 @@ else if (requestType == RequestCase.HEALTHPING) public void stop() { this.close(); } + + /** + * Sends an orchestrator response, chunking it if it exceeds the configured chunk size. + */ + private void sendOrchestratorResponse(OrchestratorResponse response) { + int serializedSize = response.getSerializedSize(); + if (serializedSize <= this.chunkSizeBytes) { + this.sidecarClient.completeOrchestratorTask(response); + return; + } + + List allActions = response.getActionsList(); + if (allActions.isEmpty()) { + // No actions to chunk and the serialized response already exceeds the configured + // chunk size. Sending this response as-is would likely exceed the gRPC message + // size limit and fail at runtime, so fail fast with a clear error message. + throw new IllegalStateException( + "OrchestratorResponse without actions exceeds the configured chunk size (" + + this.chunkSizeBytes + " bytes). Enable large-payload externalization to Azure " + + "Blob Storage or reduce the size of non-action fields."); + } + + // Compute envelope overhead (response without actions, but with chunk metadata fields). + // Include isPartial and chunkIndex since those are added to every chunk. + OrchestratorResponse envelope = response.toBuilder() + .clearActions() + .setIsPartial(true) + .setChunkIndex(com.google.protobuf.Int32Value.of(0)) + .build(); + int envelopeSize = envelope.getSerializedSize(); + int maxActionsSize = this.chunkSizeBytes - envelopeSize; + + if (maxActionsSize <= 0) { + throw new IllegalStateException( + "OrchestratorResponse envelope exceeds gRPC message size limit. Cannot chunk."); + } + + // Build chunks + List> chunks = new ArrayList<>(); + List currentChunk = new ArrayList<>(); + int currentChunkSize = 0; + + for (OrchestratorAction action : allActions) { + int actionSize = action.getSerializedSize(); + // Account for protobuf framing overhead per repeated field entry: + // field tag (1 byte) + length varint + int framingOverhead = 1 + com.google.protobuf.CodedOutputStream.computeUInt32SizeNoTag(actionSize); + int actionWireSize = actionSize + framingOverhead; + if (actionWireSize > maxActionsSize) { + throw new IllegalStateException( + "A single orchestrator action exceeds the gRPC message size limit (" + + actionWireSize + " bytes). Enable large-payload externalization to Azure Blob Storage " + + "to handle payloads of this size."); + } + + if (currentChunkSize + actionWireSize > maxActionsSize && !currentChunk.isEmpty()) { + chunks.add(currentChunk); + currentChunk = new ArrayList<>(); + currentChunkSize = 0; + } + currentChunk.add(action); + currentChunkSize += actionWireSize; + } + if (!currentChunk.isEmpty()) { + chunks.add(currentChunk); + } + + // Send chunks + for (int i = 0; i < chunks.size(); i++) { + boolean isLast = (i == chunks.size() - 1); + OrchestratorResponse.Builder chunkBuilder = response.toBuilder() + .clearActions() + .addAllActions(chunks.get(i)) + .setIsPartial(!isLast) + .setChunkIndex(com.google.protobuf.Int32Value.of(i)); + + if (i > 0) { + // Only the first chunk carries numEventsProcessed; subsequent chunks + // leave it unset (matching .NET behavior) + chunkBuilder.clearNumEventsProcessed(); + } + + this.sidecarClient.completeOrchestratorTask(chunkBuilder.build()); + } + } + + private OrchestratorRequest resolveOrchestratorRequestPayloads(OrchestratorRequest request) { + return PayloadInterceptionHelper.resolveOrchestratorRequestPayloads(request, this.payloadHelper); + } + + private ActivityRequest resolveActivityRequestPayloads(ActivityRequest request) { + return PayloadInterceptionHelper.resolveActivityRequestPayloads(request, this.payloadHelper); + } + + private OrchestratorResponse externalizeOrchestratorResponsePayloads(OrchestratorResponse response) { + return PayloadInterceptionHelper.externalizeOrchestratorResponsePayloads(response, this.payloadHelper); + } } \ No newline at end of file diff --git a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorkerBuilder.java b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorkerBuilder.java index ec39fee2..50a3cc33 100644 --- a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorkerBuilder.java +++ b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorkerBuilder.java @@ -11,6 +11,25 @@ * Builder object for constructing customized {@link DurableTaskGrpcWorker} instances. */ public final class DurableTaskGrpcWorkerBuilder { + + /** + * Minimum allowed chunk size for orchestrator response messages (1 MiB). + */ + static final int MIN_CHUNK_SIZE_BYTES = 1_048_576; + + /** + * Maximum allowed chunk size for orchestrator response messages (~3.9 MiB). + * This is the largest payload that can fit within a 4 MiB gRPC message after accounting + * for protobuf overhead. + */ + static final int MAX_CHUNK_SIZE_BYTES = 4_089_446; + + /** + * Default chunk size for orchestrator response messages. + * Matches {@link #MAX_CHUNK_SIZE_BYTES}. + */ + static final int DEFAULT_CHUNK_SIZE_BYTES = MAX_CHUNK_SIZE_BYTES; + final HashMap orchestrationFactories = new HashMap<>(); final HashMap activityFactories = new HashMap<>(); int port; @@ -18,6 +37,9 @@ public final class DurableTaskGrpcWorkerBuilder { DataConverter dataConverter; Duration maximumTimerInterval; DurableTaskGrpcWorkerVersioningOptions versioningOptions; + PayloadStore payloadStore; + LargePayloadOptions largePayloadOptions; + int chunkSizeBytes = DEFAULT_CHUNK_SIZE_BYTES; /** * Adds an orchestration factory to be used by the constructed {@link DurableTaskGrpcWorker}. @@ -125,6 +147,73 @@ public DurableTaskGrpcWorkerBuilder useVersioning(DurableTaskGrpcWorkerVersionin return this; } + /** + * Enables large payload externalization with default options. + *

+ * When enabled, payloads exceeding the default threshold will be uploaded to the + * provided {@link PayloadStore} and replaced with opaque token references. The worker + * will also announce {@code WORKER_CAPABILITY_LARGE_PAYLOADS} to the sidecar. + * + * @param payloadStore the store to use for externalizing large payloads + * @return this builder object + */ + public DurableTaskGrpcWorkerBuilder useExternalizedPayloads(PayloadStore payloadStore) { + return this.useExternalizedPayloads(payloadStore, new LargePayloadOptions.Builder().build()); + } + + /** + * Enables large payload externalization with custom options. + *

+ * When enabled, payloads exceeding the configured threshold will be uploaded to the + * provided {@link PayloadStore} and replaced with opaque token references. The worker + * will also announce {@code WORKER_CAPABILITY_LARGE_PAYLOADS} to the sidecar. + * + * @param payloadStore the store to use for externalizing large payloads + * @param options the large payload configuration options + * @return this builder object + */ + public DurableTaskGrpcWorkerBuilder useExternalizedPayloads(PayloadStore payloadStore, LargePayloadOptions options) { + if (payloadStore == null) { + throw new IllegalArgumentException("payloadStore must not be null"); + } + if (options == null) { + throw new IllegalArgumentException("options must not be null"); + } + this.payloadStore = payloadStore; + this.largePayloadOptions = options; + return this; + } + + /** + * Sets the maximum size in bytes for a single orchestrator response chunk sent over gRPC. + * Responses larger than this will be automatically split into multiple chunks. + *

+ * The value must be between {@value #MIN_CHUNK_SIZE_BYTES} and {@value #MAX_CHUNK_SIZE_BYTES} bytes. + * Defaults to {@value #DEFAULT_CHUNK_SIZE_BYTES} bytes. + * + * @param chunkSizeBytes the maximum chunk size in bytes + * @return this builder object + * @throws IllegalArgumentException if the value is outside the allowed range + */ + public DurableTaskGrpcWorkerBuilder setCompleteOrchestratorResponseChunkSizeBytes(int chunkSizeBytes) { + if (chunkSizeBytes < MIN_CHUNK_SIZE_BYTES || chunkSizeBytes > MAX_CHUNK_SIZE_BYTES) { + throw new IllegalArgumentException(String.format( + "chunkSizeBytes must be between %d and %d, but was %d", + MIN_CHUNK_SIZE_BYTES, MAX_CHUNK_SIZE_BYTES, chunkSizeBytes)); + } + this.chunkSizeBytes = chunkSizeBytes; + return this; + } + + /** + * Gets the current chunk size setting. + * + * @return the chunk size in bytes + */ + int getChunkSizeBytes() { + return this.chunkSizeBytes; + } + /** * Initializes a new {@link DurableTaskGrpcWorker} object with the settings specified in the current builder object. * @return a new {@link DurableTaskGrpcWorker} object diff --git a/client/src/main/java/com/microsoft/durabletask/LargePayloadOptions.java b/client/src/main/java/com/microsoft/durabletask/LargePayloadOptions.java new file mode 100644 index 00000000..8bec1145 --- /dev/null +++ b/client/src/main/java/com/microsoft/durabletask/LargePayloadOptions.java @@ -0,0 +1,119 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package com.microsoft.durabletask; + +/** + * Configuration options for large payload externalization. + *

+ * This class defines the size thresholds that control when payloads are externalized + * to a {@link PayloadStore}. It is a pure configuration class and does not hold a + * reference to any {@link PayloadStore} implementation. + *

+ * Use the {@link Builder} to create instances: + *

{@code
+ * LargePayloadOptions options = new LargePayloadOptions.Builder()
+ *     .setThresholdBytes(900_000)
+ *     .setMaxExternalizedPayloadBytes(10 * 1024 * 1024)
+ *     .build();
+ * }
+ * + * @see PayloadStore + */ +public final class LargePayloadOptions { + + /** + * Default externalization threshold in bytes (900,000 bytes, matching .NET SDK). + */ + public static final int DEFAULT_THRESHOLD_BYTES = 900_000; + + /** + * Default maximum externalized payload size in bytes (10 MiB, matching .NET SDK). + */ + public static final int DEFAULT_MAX_EXTERNALIZED_PAYLOAD_BYTES = 10 * 1024 * 1024; + + private final int thresholdBytes; + private final int maxExternalizedPayloadBytes; + + private LargePayloadOptions(Builder builder) { + this.thresholdBytes = builder.thresholdBytes; + this.maxExternalizedPayloadBytes = builder.maxExternalizedPayloadBytes; + } + + /** + * Gets the size threshold in bytes at or above which payloads will be externalized. + * Payloads below this size are sent inline. The comparison uses UTF-8 byte length. + * + * @return the externalization threshold in bytes + */ + public int getThresholdBytes() { + return this.thresholdBytes; + } + + /** + * Gets the maximum payload size in bytes that can be externalized. + * Payloads exceeding this size will cause an error to be thrown. + * + * @return the maximum externalized payload size in bytes + */ + public int getMaxExternalizedPayloadBytes() { + return this.maxExternalizedPayloadBytes; + } + + /** + * Builder for constructing {@link LargePayloadOptions} instances. + */ + public static final class Builder { + private int thresholdBytes = DEFAULT_THRESHOLD_BYTES; + private int maxExternalizedPayloadBytes = DEFAULT_MAX_EXTERNALIZED_PAYLOAD_BYTES; + + /** + * Sets the size threshold in bytes above which payloads will be externalized. + * Must not exceed 1 MiB (1,048,576 bytes). + * + * @param thresholdBytes the externalization threshold in bytes + * @return this builder + * @throws IllegalArgumentException if thresholdBytes is negative or exceeds 1 MiB + */ + public Builder setThresholdBytes(int thresholdBytes) { + if (thresholdBytes < 0) { + throw new IllegalArgumentException("thresholdBytes must not be negative"); + } + if (thresholdBytes > 1_048_576) { + throw new IllegalArgumentException("thresholdBytes must not exceed 1 MiB (1,048,576 bytes)"); + } + this.thresholdBytes = thresholdBytes; + return this; + } + + /** + * Sets the maximum payload size in bytes that can be externalized. + * Payloads exceeding this size will cause an error. + * + * @param maxExternalizedPayloadBytes the maximum externalized payload size in bytes + * @return this builder + * @throws IllegalArgumentException if maxExternalizedPayloadBytes is not positive + */ + public Builder setMaxExternalizedPayloadBytes(int maxExternalizedPayloadBytes) { + if (maxExternalizedPayloadBytes <= 0) { + throw new IllegalArgumentException("maxExternalizedPayloadBytes must be positive"); + } + this.maxExternalizedPayloadBytes = maxExternalizedPayloadBytes; + return this; + } + + /** + * Builds a new {@link LargePayloadOptions} instance from the current builder settings. + * + * @return a new {@link LargePayloadOptions} instance + * @throws IllegalStateException if thresholdBytes is not less than maxExternalizedPayloadBytes + */ + public LargePayloadOptions build() { + if (this.thresholdBytes >= this.maxExternalizedPayloadBytes) { + throw new IllegalStateException( + "thresholdBytes (" + this.thresholdBytes + + ") must be less than maxExternalizedPayloadBytes (" + this.maxExternalizedPayloadBytes + ")"); + } + return new LargePayloadOptions(this); + } + } +} diff --git a/client/src/main/java/com/microsoft/durabletask/OrchestrationRunner.java b/client/src/main/java/com/microsoft/durabletask/OrchestrationRunner.java index e5819b23..36bf4ec8 100644 --- a/client/src/main/java/com/microsoft/durabletask/OrchestrationRunner.java +++ b/client/src/main/java/com/microsoft/durabletask/OrchestrationRunner.java @@ -44,9 +44,44 @@ private OrchestrationRunner() { public static String loadAndRun( String base64EncodedOrchestratorRequest, OrchestratorFunction orchestratorFunc) { - // Example string: CiBhOTMyYjdiYWM5MmI0MDM5YjRkMTYxMDIwNzlmYTM1YSIaCP///////////wESCwi254qRBhDk+rgocgAicgj///////////8BEgwIs+eKkQYQzMXjnQMaVwoLSGVsbG9DaXRpZXMSACJGCiBhOTMyYjdiYWM5MmI0MDM5YjRkMTYxMDIwNzlmYTM1YRIiCiA3ODEwOTA2N2Q4Y2Q0ODg1YWU4NjQ0OTNlMmRlMGQ3OA== + return loadAndRun(base64EncodedOrchestratorRequest, orchestratorFunc, null, null); + } + + /** + * Loads orchestration history from {@code base64EncodedOrchestratorRequest} and uses it to execute the + * orchestrator function with large payload externalization support. + * + * @param base64EncodedOrchestratorRequest the base64-encoded protobuf payload + * @param orchestratorFunc a function that implements the orchestrator logic + * @param store the payload store for externalizing/resolving large payloads + * @param the type of the orchestrator function output + * @return a base64-encoded protobuf payload of orchestrator actions + */ + public static String loadAndRun( + String base64EncodedOrchestratorRequest, + OrchestratorFunction orchestratorFunc, + PayloadStore store) { + return loadAndRun(base64EncodedOrchestratorRequest, orchestratorFunc, store, new LargePayloadOptions.Builder().build()); + } + + /** + * Loads orchestration history from {@code base64EncodedOrchestratorRequest} and uses it to execute the + * orchestrator function with large payload externalization support and custom options. + * + * @param base64EncodedOrchestratorRequest the base64-encoded protobuf payload + * @param orchestratorFunc a function that implements the orchestrator logic + * @param store the payload store for externalizing/resolving large payloads + * @param options the large payload configuration options + * @param the type of the orchestrator function output + * @return a base64-encoded protobuf payload of orchestrator actions + */ + public static String loadAndRun( + String base64EncodedOrchestratorRequest, + OrchestratorFunction orchestratorFunc, + PayloadStore store, + LargePayloadOptions options) { byte[] decodedBytes = Base64.getDecoder().decode(base64EncodedOrchestratorRequest); - byte[] resultBytes = loadAndRun(decodedBytes, orchestratorFunc); + byte[] resultBytes = loadAndRun(decodedBytes, orchestratorFunc, store, options); return Base64.getEncoder().encodeToString(resultBytes); } @@ -63,6 +98,42 @@ public static String loadAndRun( public static byte[] loadAndRun( byte[] orchestratorRequestBytes, OrchestratorFunction orchestratorFunc) { + return loadAndRun(orchestratorRequestBytes, orchestratorFunc, null, null); + } + + /** + * Loads orchestration history from {@code orchestratorRequestBytes} and uses it to execute the + * orchestrator function with large payload externalization support. + * + * @param orchestratorRequestBytes the protobuf payload + * @param orchestratorFunc a function that implements the orchestrator logic + * @param store the payload store for externalizing/resolving large payloads + * @param the type of the orchestrator function output + * @return a protobuf-encoded payload of orchestrator actions + */ + public static byte[] loadAndRun( + byte[] orchestratorRequestBytes, + OrchestratorFunction orchestratorFunc, + PayloadStore store) { + return loadAndRun(orchestratorRequestBytes, orchestratorFunc, store, new LargePayloadOptions.Builder().build()); + } + + /** + * Loads orchestration history from {@code orchestratorRequestBytes} and uses it to execute the + * orchestrator function with large payload externalization support and custom options. + * + * @param orchestratorRequestBytes the protobuf payload + * @param orchestratorFunc a function that implements the orchestrator logic + * @param store the payload store for externalizing/resolving large payloads + * @param options the large payload configuration options + * @param the type of the orchestrator function output + * @return a protobuf-encoded payload of orchestrator actions + */ + public static byte[] loadAndRun( + byte[] orchestratorRequestBytes, + OrchestratorFunction orchestratorFunc, + PayloadStore store, + LargePayloadOptions options) { if (orchestratorFunc == null) { throw new IllegalArgumentException("orchestratorFunc must not be null"); } @@ -73,7 +144,7 @@ public static byte[] loadAndRun( ctx.complete(output); }; - return loadAndRun(orchestratorRequestBytes, orchestration); + return loadAndRun(orchestratorRequestBytes, orchestration, store, options); } /** @@ -88,8 +159,41 @@ public static byte[] loadAndRun( public static String loadAndRun( String base64EncodedOrchestratorRequest, TaskOrchestration orchestration) { + return loadAndRun(base64EncodedOrchestratorRequest, orchestration, null, null); + } + + /** + * Loads orchestration history and executes the orchestration with large payload externalization support. + * + * @param base64EncodedOrchestratorRequest the base64-encoded protobuf payload + * @param orchestration the orchestration to execute + * @param store the payload store for externalizing/resolving large payloads + * @return a base64-encoded protobuf payload of orchestrator actions + */ + public static String loadAndRun( + String base64EncodedOrchestratorRequest, + TaskOrchestration orchestration, + PayloadStore store) { + return loadAndRun(base64EncodedOrchestratorRequest, orchestration, store, new LargePayloadOptions.Builder().build()); + } + + /** + * Loads orchestration history and executes the orchestration with large payload externalization + * support and custom options. + * + * @param base64EncodedOrchestratorRequest the base64-encoded protobuf payload + * @param orchestration the orchestration to execute + * @param store the payload store for externalizing/resolving large payloads + * @param options the large payload configuration options + * @return a base64-encoded protobuf payload of orchestrator actions + */ + public static String loadAndRun( + String base64EncodedOrchestratorRequest, + TaskOrchestration orchestration, + PayloadStore store, + LargePayloadOptions options) { byte[] decodedBytes = Base64.getDecoder().decode(base64EncodedOrchestratorRequest); - byte[] resultBytes = loadAndRun(decodedBytes, orchestration); + byte[] resultBytes = loadAndRun(decodedBytes, orchestration, store, options); return Base64.getEncoder().encodeToString(resultBytes); } @@ -103,6 +207,33 @@ public static String loadAndRun( * @throws IllegalArgumentException if either parameter is {@code null} or if {@code orchestratorRequestBytes} is not valid protobuf */ public static byte[] loadAndRun(byte[] orchestratorRequestBytes, TaskOrchestration orchestration) { + return loadAndRun(orchestratorRequestBytes, orchestration, null, null); + } + + /** + * Loads orchestration history and executes the orchestration with large payload externalization support. + * + * @param orchestratorRequestBytes the protobuf payload + * @param orchestration the orchestration to execute + * @param store the payload store for externalizing/resolving large payloads + * @return a protobuf-encoded payload of orchestrator actions + */ + public static byte[] loadAndRun(byte[] orchestratorRequestBytes, TaskOrchestration orchestration, PayloadStore store) { + return loadAndRun(orchestratorRequestBytes, orchestration, store, new LargePayloadOptions.Builder().build()); + } + + /** + * Loads orchestration history and executes the orchestration with large payload externalization + * support and custom options. + * + * @param orchestratorRequestBytes the protobuf payload + * @param orchestration the orchestration to execute + * @param store the payload store for externalizing/resolving large payloads + * @param options the large payload configuration options + * @return a protobuf-encoded payload of orchestrator actions + */ + public static byte[] loadAndRun(byte[] orchestratorRequestBytes, TaskOrchestration orchestration, + PayloadStore store, LargePayloadOptions options) { if (orchestratorRequestBytes == null || orchestratorRequestBytes.length == 0) { throw new IllegalArgumentException("triggerStateProtoBytes must not be null or empty"); } @@ -118,6 +249,15 @@ public static byte[] loadAndRun(byte[] orchestratorRequestBytes, TaskOrchestrati throw new IllegalArgumentException("triggerStateProtoBytes was not valid protobuf", e); } + // Resolve externalized payload URI tokens in incoming request + PayloadHelper payloadHelper = null; + if (store != null) { + LargePayloadOptions resolvedOptions = options != null ? options : new LargePayloadOptions.Builder().build(); + payloadHelper = new PayloadHelper(store, resolvedOptions); + orchestratorRequest = PayloadInterceptionHelper.resolveOrchestratorRequestPayloads( + orchestratorRequest, payloadHelper); + } + // Register the passed orchestration as the default ("*") orchestration HashMap orchestrationFactories = new HashMap<>(); orchestrationFactories.put("*", new TaskOrchestrationFactory() { @@ -196,6 +336,13 @@ public TaskOrchestration create() { .addAllActions(taskOrchestratorResult.getActions()) .setCustomStatus(StringValue.of(taskOrchestratorResult.getCustomStatus())) .build(); + + // Externalize large payloads in outgoing response + if (payloadHelper != null) { + response = PayloadInterceptionHelper.externalizeOrchestratorResponsePayloads( + response, payloadHelper); + } + return response.toByteArray(); } } diff --git a/client/src/main/java/com/microsoft/durabletask/PayloadHelper.java b/client/src/main/java/com/microsoft/durabletask/PayloadHelper.java new file mode 100644 index 00000000..adae0067 --- /dev/null +++ b/client/src/main/java/com/microsoft/durabletask/PayloadHelper.java @@ -0,0 +1,119 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package com.microsoft.durabletask; + +/** + * Internal utility for externalizing and resolving payloads using a {@link PayloadStore}. + *

+ * This class encapsulates the threshold-checking and store-delegation logic that is shared + * across the worker, client, and orchestration runner code paths. + */ +final class PayloadHelper { + + private final PayloadStore store; + private final LargePayloadOptions options; + + /** + * Creates a new PayloadHelper. + * + * @param store the payload store to use for upload/download operations + * @param options the large payload configuration options + */ + PayloadHelper(PayloadStore store, LargePayloadOptions options) { + if (store == null) { + throw new IllegalArgumentException("store must not be null"); + } + if (options == null) { + throw new IllegalArgumentException("options must not be null"); + } + this.store = store; + this.options = options; + } + + /** + * Externalizes the given value if it exceeds the configured threshold. + *

+ * The check order matches .NET: (1) null/empty guard, (2) below-threshold guard, + * (3) above-max-cap rejection, (4) upload. + * + * @param value the payload string to potentially externalize + * @return the original value if below threshold, or an opaque token if externalized + * @throws PayloadTooLargeException if the payload exceeds the maximum externalized payload size + */ + String maybeExternalize(String value) { + // (1) null/empty guard + if (value == null || value.isEmpty()) { + return value; + } + + // Fast path: each Java char contributes at least 1 UTF-8 byte, + // so length() is always <= UTF-8 byte length. + if (value.length() < this.options.getThresholdBytes()) { + return value; + } + + int byteSize = utf8ByteLength(value); + + // (2) below-threshold guard (strict less-than, matching .NET) + if (byteSize < this.options.getThresholdBytes()) { + return value; + } + + // (3) above-max-cap rejection + if (byteSize > this.options.getMaxExternalizedPayloadBytes()) { + throw new PayloadTooLargeException(String.format( + "Payload size %d KB exceeds maximum of %d KB. " + + "Reduce the payload size or increase maxExternalizedPayloadBytes.", + byteSize / 1024, + this.options.getMaxExternalizedPayloadBytes() / 1024)); + } + + // (4) upload + return this.store.upload(value); + } + + /** + * Resolves the given value if it is a known payload token. + * + * @param value the string to potentially resolve + * @return the resolved payload data if the value was a token, or the original value otherwise + */ + String maybeResolve(String value) { + if (value == null || value.isEmpty()) { + return value; + } + + if (!this.store.isKnownPayloadToken(value)) { + return value; + } + + String resolved = this.store.download(value); + if (resolved == null) { + throw new IllegalStateException( + "PayloadStore.download() returned null for token: " + value); + } + return resolved; + } + + /** + * Counts the number of UTF-8 bytes needed to encode the given string, + * without allocating a byte array (unlike {@code String.getBytes(UTF_8).length}). + */ + private static int utf8ByteLength(String s) { + int count = 0; + for (int i = 0; i < s.length(); i++) { + char c = s.charAt(i); + if (c <= 0x7F) { + count++; + } else if (c <= 0x7FF) { + count += 2; + } else if (Character.isHighSurrogate(c)) { + count += 4; + i++; // skip low surrogate + } else { + count += 3; + } + } + return count; + } +} diff --git a/client/src/main/java/com/microsoft/durabletask/PayloadInterceptionHelper.java b/client/src/main/java/com/microsoft/durabletask/PayloadInterceptionHelper.java new file mode 100644 index 00000000..591dba51 --- /dev/null +++ b/client/src/main/java/com/microsoft/durabletask/PayloadInterceptionHelper.java @@ -0,0 +1,326 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package com.microsoft.durabletask; + +import com.google.protobuf.StringValue; +import com.microsoft.durabletask.implementation.protobuf.OrchestratorService.*; + +import java.util.ArrayList; +import java.util.List; + +/** + * Internal helper for intercepting protobuf messages to resolve/externalize payload fields. + * Shared between {@link DurableTaskGrpcWorker} and {@link OrchestrationRunner}. + */ +final class PayloadInterceptionHelper { + + private PayloadInterceptionHelper() { + } + + /** + * Resolves externalized payload URI tokens in an OrchestratorRequest's history events. + */ + static OrchestratorRequest resolveOrchestratorRequestPayloads( + OrchestratorRequest request, PayloadHelper payloadHelper) { + List resolvedPastEvents = resolveHistoryEvents(request.getPastEventsList(), payloadHelper); + List resolvedNewEvents = resolveHistoryEvents(request.getNewEventsList(), payloadHelper); + + boolean pastChanged = resolvedPastEvents != request.getPastEventsList(); + boolean newChanged = resolvedNewEvents != request.getNewEventsList(); + + if (!pastChanged && !newChanged) { + return request; + } + + OrchestratorRequest.Builder builder = request.toBuilder(); + if (pastChanged) { + builder.clearPastEvents().addAllPastEvents(resolvedPastEvents); + } + if (newChanged) { + builder.clearNewEvents().addAllNewEvents(resolvedNewEvents); + } + return builder.build(); + } + + /** + * Resolves externalized payload URI token in an ActivityRequest's input. + */ + static ActivityRequest resolveActivityRequestPayloads( + ActivityRequest request, PayloadHelper payloadHelper) { + if (!request.hasInput() || request.getInput().getValue().isEmpty()) { + return request; + } + String resolved = payloadHelper.maybeResolve(request.getInput().getValue()); + if (resolved.equals(request.getInput().getValue())) { + return request; + } + return request.toBuilder().setInput(StringValue.of(resolved)).build(); + } + + /** + * Externalizes large payloads in an OrchestratorResponse's actions and custom status. + */ + static OrchestratorResponse externalizeOrchestratorResponsePayloads( + OrchestratorResponse response, PayloadHelper payloadHelper) { + boolean changed = false; + OrchestratorResponse.Builder responseBuilder = response.toBuilder(); + + // Externalize customStatus + if (response.hasCustomStatus() && !response.getCustomStatus().getValue().isEmpty()) { + String externalized = payloadHelper.maybeExternalize(response.getCustomStatus().getValue()); + if (!externalized.equals(response.getCustomStatus().getValue())) { + responseBuilder.setCustomStatus(StringValue.of(externalized)); + changed = true; + } + } + + // Externalize action payloads + List actions = response.getActionsList(); + List newActions = null; + for (int i = 0; i < actions.size(); i++) { + OrchestratorAction action = actions.get(i); + OrchestratorAction externalizedAction = externalizeAction(action, payloadHelper); + if (externalizedAction != action) { + if (newActions == null) { + newActions = new ArrayList<>(actions); + } + newActions.set(i, externalizedAction); + } + } + + if (newActions != null) { + responseBuilder.clearActions().addAllActions(newActions); + changed = true; + } + + return changed ? responseBuilder.build() : response; + } + + // --- Private helpers --- + + private static List resolveHistoryEvents( + List events, PayloadHelper payloadHelper) { + List resolved = null; + for (int i = 0; i < events.size(); i++) { + HistoryEvent event = events.get(i); + HistoryEvent resolvedEvent = resolveHistoryEvent(event, payloadHelper); + if (resolvedEvent != event) { + if (resolved == null) { + resolved = new ArrayList<>(events); + } + resolved.set(i, resolvedEvent); + } + } + return resolved != null ? resolved : events; + } + + private static HistoryEvent resolveHistoryEvent(HistoryEvent event, PayloadHelper payloadHelper) { + switch (event.getEventTypeCase()) { + case EXECUTIONSTARTED: + return resolveStringValueField(event, event.getExecutionStarted().getInput(), payloadHelper, + (e, v) -> e.toBuilder().setExecutionStarted( + e.getExecutionStarted().toBuilder().setInput(v)).build()); + case EXECUTIONCOMPLETED: + return resolveStringValueField(event, event.getExecutionCompleted().getResult(), payloadHelper, + (e, v) -> e.toBuilder().setExecutionCompleted( + e.getExecutionCompleted().toBuilder().setResult(v)).build()); + case TASKCOMPLETED: + return resolveStringValueField(event, event.getTaskCompleted().getResult(), payloadHelper, + (e, v) -> e.toBuilder().setTaskCompleted( + e.getTaskCompleted().toBuilder().setResult(v)).build()); + case TASKSCHEDULED: + return resolveStringValueField(event, event.getTaskScheduled().getInput(), payloadHelper, + (e, v) -> e.toBuilder().setTaskScheduled( + e.getTaskScheduled().toBuilder().setInput(v)).build()); + case SUBORCHESTRATIONINSTANCECREATED: + return resolveStringValueField(event, event.getSubOrchestrationInstanceCreated().getInput(), payloadHelper, + (e, v) -> e.toBuilder().setSubOrchestrationInstanceCreated( + e.getSubOrchestrationInstanceCreated().toBuilder().setInput(v)).build()); + case SUBORCHESTRATIONINSTANCECOMPLETED: + return resolveStringValueField(event, event.getSubOrchestrationInstanceCompleted().getResult(), payloadHelper, + (e, v) -> e.toBuilder().setSubOrchestrationInstanceCompleted( + e.getSubOrchestrationInstanceCompleted().toBuilder().setResult(v)).build()); + case EVENTRAISED: + return resolveStringValueField(event, event.getEventRaised().getInput(), payloadHelper, + (e, v) -> e.toBuilder().setEventRaised( + e.getEventRaised().toBuilder().setInput(v)).build()); + case EVENTSENT: + return resolveStringValueField(event, event.getEventSent().getInput(), payloadHelper, + (e, v) -> e.toBuilder().setEventSent( + e.getEventSent().toBuilder().setInput(v)).build()); + case GENERICEVENT: + return resolveStringValueField(event, event.getGenericEvent().getData(), payloadHelper, + (e, v) -> e.toBuilder().setGenericEvent( + e.getGenericEvent().toBuilder().setData(v)).build()); + case CONTINUEASNEW: + return resolveStringValueField(event, event.getContinueAsNew().getInput(), payloadHelper, + (e, v) -> e.toBuilder().setContinueAsNew( + e.getContinueAsNew().toBuilder().setInput(v)).build()); + case EXECUTIONTERMINATED: + return resolveStringValueField(event, event.getExecutionTerminated().getInput(), payloadHelper, + (e, v) -> e.toBuilder().setExecutionTerminated( + e.getExecutionTerminated().toBuilder().setInput(v)).build()); + case EXECUTIONSUSPENDED: + return resolveStringValueField(event, event.getExecutionSuspended().getInput(), payloadHelper, + (e, v) -> e.toBuilder().setExecutionSuspended( + e.getExecutionSuspended().toBuilder().setInput(v)).build()); + case EXECUTIONRESUMED: + return resolveStringValueField(event, event.getExecutionResumed().getInput(), payloadHelper, + (e, v) -> e.toBuilder().setExecutionResumed( + e.getExecutionResumed().toBuilder().setInput(v)).build()); + case EXECUTIONREWOUND: + return resolveStringValueField(event, event.getExecutionRewound().getInput(), payloadHelper, + (e, v) -> e.toBuilder().setExecutionRewound( + e.getExecutionRewound().toBuilder().setInput(v)).build()); + default: + return event; + } + } + + @FunctionalInterface + interface HistoryEventUpdater { + HistoryEvent apply(HistoryEvent event, StringValue newValue); + } + + private static HistoryEvent resolveStringValueField(HistoryEvent event, StringValue field, + PayloadHelper payloadHelper, + HistoryEventUpdater updater) { + if (field.getValue().isEmpty()) { + return event; + } + String resolved = payloadHelper.maybeResolve(field.getValue()); + if (resolved.equals(field.getValue())) { + return event; + } + return updater.apply(event, StringValue.of(resolved)); + } + + private static OrchestratorAction externalizeAction(OrchestratorAction action, PayloadHelper payloadHelper) { + switch (action.getOrchestratorActionTypeCase()) { + case SCHEDULETASK: { + ScheduleTaskAction inner = action.getScheduleTask(); + if (inner.hasInput()) { + String ext = payloadHelper.maybeExternalize(inner.getInput().getValue()); + if (!ext.equals(inner.getInput().getValue())) { + return action.toBuilder().setScheduleTask( + inner.toBuilder().setInput(StringValue.of(ext))).build(); + } + } + return action; + } + case CREATESUBORCHESTRATION: { + CreateSubOrchestrationAction inner = action.getCreateSubOrchestration(); + if (inner.hasInput()) { + String ext = payloadHelper.maybeExternalize(inner.getInput().getValue()); + if (!ext.equals(inner.getInput().getValue())) { + return action.toBuilder().setCreateSubOrchestration( + inner.toBuilder().setInput(StringValue.of(ext))).build(); + } + } + return action; + } + case COMPLETEORCHESTRATION: { + CompleteOrchestrationAction inner = action.getCompleteOrchestration(); + CompleteOrchestrationAction.Builder innerBuilder = null; + if (inner.hasResult()) { + String ext = payloadHelper.maybeExternalize(inner.getResult().getValue()); + if (!ext.equals(inner.getResult().getValue())) { + innerBuilder = inner.toBuilder().setResult(StringValue.of(ext)); + } + } + if (inner.hasDetails()) { + String ext = payloadHelper.maybeExternalize(inner.getDetails().getValue()); + if (!ext.equals(inner.getDetails().getValue())) { + if (innerBuilder == null) innerBuilder = inner.toBuilder(); + innerBuilder.setDetails(StringValue.of(ext)); + } + } + // Externalize carryover events + List carryoverEvents = inner.getCarryoverEventsList(); + if (!carryoverEvents.isEmpty()) { + List externalizedEvents = externalizeHistoryEvents(carryoverEvents, payloadHelper); + if (externalizedEvents != carryoverEvents) { + if (innerBuilder == null) innerBuilder = inner.toBuilder(); + innerBuilder.clearCarryoverEvents().addAllCarryoverEvents(externalizedEvents); + } + } + if (innerBuilder != null) { + return action.toBuilder().setCompleteOrchestration(innerBuilder).build(); + } + return action; + } + case TERMINATEORCHESTRATION: { + TerminateOrchestrationAction inner = action.getTerminateOrchestration(); + if (inner.hasReason()) { + String ext = payloadHelper.maybeExternalize(inner.getReason().getValue()); + if (!ext.equals(inner.getReason().getValue())) { + return action.toBuilder().setTerminateOrchestration( + inner.toBuilder().setReason(StringValue.of(ext))).build(); + } + } + return action; + } + case SENDEVENT: { + SendEventAction inner = action.getSendEvent(); + if (inner.hasData()) { + String ext = payloadHelper.maybeExternalize(inner.getData().getValue()); + if (!ext.equals(inner.getData().getValue())) { + return action.toBuilder().setSendEvent( + inner.toBuilder().setData(StringValue.of(ext))).build(); + } + } + return action; + } + default: + return action; + } + } + + private static List externalizeHistoryEvents( + List events, PayloadHelper payloadHelper) { + List result = null; + for (int i = 0; i < events.size(); i++) { + HistoryEvent event = events.get(i); + HistoryEvent externalized = externalizeHistoryEvent(event, payloadHelper); + if (externalized != event) { + if (result == null) { + result = new ArrayList<>(events); + } + result.set(i, externalized); + } + } + return result != null ? result : events; + } + + private static HistoryEvent externalizeHistoryEvent(HistoryEvent event, PayloadHelper payloadHelper) { + switch (event.getEventTypeCase()) { + case EVENTRAISED: + return externalizeStringValueField(event, event.getEventRaised().getInput(), payloadHelper, + (e, v) -> e.toBuilder().setEventRaised( + e.getEventRaised().toBuilder().setInput(v)).build()); + case EVENTSENT: + return externalizeStringValueField(event, event.getEventSent().getInput(), payloadHelper, + (e, v) -> e.toBuilder().setEventSent( + e.getEventSent().toBuilder().setInput(v)).build()); + case GENERICEVENT: + return externalizeStringValueField(event, event.getGenericEvent().getData(), payloadHelper, + (e, v) -> e.toBuilder().setGenericEvent( + e.getGenericEvent().toBuilder().setData(v)).build()); + default: + return event; + } + } + + private static HistoryEvent externalizeStringValueField(HistoryEvent event, StringValue field, + PayloadHelper payloadHelper, + HistoryEventUpdater updater) { + if (field.getValue().isEmpty()) { + return event; + } + String externalized = payloadHelper.maybeExternalize(field.getValue()); + if (externalized.equals(field.getValue())) { + return event; + } + return updater.apply(event, StringValue.of(externalized)); + } +} diff --git a/client/src/main/java/com/microsoft/durabletask/PayloadStore.java b/client/src/main/java/com/microsoft/durabletask/PayloadStore.java new file mode 100644 index 00000000..148bdf91 --- /dev/null +++ b/client/src/main/java/com/microsoft/durabletask/PayloadStore.java @@ -0,0 +1,58 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package com.microsoft.durabletask; + +/** + * Interface for externalizing and resolving large payloads to/from an external store. + *

+ * Implementations of this interface handle uploading large payloads to external storage + * (e.g., Azure Blob Storage) and returning opaque token references that can be resolved + * back to the original data. This enables the Durable Task framework to handle payloads + * that exceed gRPC message size limits. + *

+ * The store implementation is solely responsible for generating blob names/keys and + * defining the token format. The core framework treats tokens as opaque strings and + * delegates token recognition to {@link #isKnownPayloadToken(String)}. + *

+ * Payload retention: This interface does not define a deletion mechanism. + * Externalized payloads persist until removed by external means. When using Azure + * Blob Storage, configure + * + * lifecycle management policies to automatically expire old payloads. + *

+ * Performance note: All methods on this interface are synchronous. Implementations + * that perform network I/O (e.g., Azure Blob Storage) should ensure that latency is + * acceptable on the calling thread. The framework calls these methods on the worker's + * processing thread. + * + * @see LargePayloadOptions + */ +public interface PayloadStore { + /** + * Uploads a payload string to external storage and returns an opaque token reference. + * + * @param payload the payload data to upload + * @return an opaque token string that can be used to retrieve the payload via {@link #download(String)} + */ + String upload(String payload); + + /** + * Downloads a payload from external storage using the given token reference. + * + * @param token the opaque token returned by a previous {@link #upload(String)} call + * @return the original payload string + */ + String download(String token); + + /** + * Determines whether the given value is a token that was produced by this store. + *

+ * This method is used by the framework to distinguish between regular payload data + * and externalized payload references. Only values recognized as tokens will be + * resolved via {@link #download(String)}. + * + * @param value the string value to check + * @return {@code true} if the value is a known payload token from this store; {@code false} otherwise + */ + boolean isKnownPayloadToken(String value); +} diff --git a/client/src/main/java/com/microsoft/durabletask/PayloadStoreProvider.java b/client/src/main/java/com/microsoft/durabletask/PayloadStoreProvider.java new file mode 100644 index 00000000..99e358a2 --- /dev/null +++ b/client/src/main/java/com/microsoft/durabletask/PayloadStoreProvider.java @@ -0,0 +1,30 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package com.microsoft.durabletask; + +/** + * Service provider interface for discovering {@link PayloadStore} implementations at runtime. + *

+ * Implementations are discovered via {@link java.util.ServiceLoader}. To register a provider, + * create a file {@code META-INF/services/com.microsoft.durabletask.PayloadStoreProvider} + * containing the fully qualified class name of the implementation. + *

+ * The provider is responsible for reading its own configuration (e.g., environment variables) + * and determining whether it can create a functional {@link PayloadStore}. + * + * @see PayloadStore + */ +public interface PayloadStoreProvider { + + /** + * Attempts to create a {@link PayloadStore} based on available configuration. + *

+ * Implementations should inspect environment variables or other configuration sources + * to determine if they can provide a store. If the required configuration is not present, + * this method should return {@code null} rather than throwing an exception. + * + * @return a configured {@link PayloadStore}, or {@code null} if the required configuration + * is not available + */ + PayloadStore create(); +} diff --git a/client/src/main/java/com/microsoft/durabletask/PayloadTooLargeException.java b/client/src/main/java/com/microsoft/durabletask/PayloadTooLargeException.java new file mode 100644 index 00000000..ba385121 --- /dev/null +++ b/client/src/main/java/com/microsoft/durabletask/PayloadTooLargeException.java @@ -0,0 +1,20 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package com.microsoft.durabletask; + +/** + * Thrown when a payload exceeds the maximum allowed size for externalization. + * + * @see LargePayloadOptions#getMaxExternalizedPayloadBytes() + */ +public class PayloadTooLargeException extends RuntimeException { + + /** + * Creates a new PayloadTooLargeException with the specified message. + * + * @param message the detail message + */ + public PayloadTooLargeException(String message) { + super(message); + } +} diff --git a/client/src/test/java/com/microsoft/durabletask/DurableTaskGrpcWorkerBuilderTest.java b/client/src/test/java/com/microsoft/durabletask/DurableTaskGrpcWorkerBuilderTest.java new file mode 100644 index 00000000..36da8f7f --- /dev/null +++ b/client/src/test/java/com/microsoft/durabletask/DurableTaskGrpcWorkerBuilderTest.java @@ -0,0 +1,162 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.microsoft.durabletask; + +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.*; + +/** + * Unit tests for DurableTaskGrpcWorkerBuilder — chunk size validation + * and worker capability announcement. + */ +public class DurableTaskGrpcWorkerBuilderTest { + + // ---- Chunk size validation tests (matches .NET GrpcDurableTaskWorkerOptionsTests) ---- + + @Test + void defaultChunkSize_isWithinRange() { + DurableTaskGrpcWorkerBuilder builder = new DurableTaskGrpcWorkerBuilder(); + int chunkSize = builder.getChunkSizeBytes(); + assertTrue(chunkSize >= DurableTaskGrpcWorkerBuilder.MIN_CHUNK_SIZE_BYTES, + "Default chunk size should be >= MIN"); + assertTrue(chunkSize <= DurableTaskGrpcWorkerBuilder.MAX_CHUNK_SIZE_BYTES, + "Default chunk size should be <= MAX"); + } + + @Test + void chunkSize_belowMin_throws() { + DurableTaskGrpcWorkerBuilder builder = new DurableTaskGrpcWorkerBuilder(); + assertThrows(IllegalArgumentException.class, + () -> builder.setCompleteOrchestratorResponseChunkSizeBytes( + DurableTaskGrpcWorkerBuilder.MIN_CHUNK_SIZE_BYTES - 1)); + } + + @Test + void chunkSize_aboveMax_throws() { + DurableTaskGrpcWorkerBuilder builder = new DurableTaskGrpcWorkerBuilder(); + assertThrows(IllegalArgumentException.class, + () -> builder.setCompleteOrchestratorResponseChunkSizeBytes( + DurableTaskGrpcWorkerBuilder.MAX_CHUNK_SIZE_BYTES + 1)); + } + + @Test + void chunkSize_atMinBoundary_succeeds() { + DurableTaskGrpcWorkerBuilder builder = new DurableTaskGrpcWorkerBuilder(); + builder.setCompleteOrchestratorResponseChunkSizeBytes( + DurableTaskGrpcWorkerBuilder.MIN_CHUNK_SIZE_BYTES); + assertEquals(DurableTaskGrpcWorkerBuilder.MIN_CHUNK_SIZE_BYTES, builder.getChunkSizeBytes()); + } + + @Test + void chunkSize_atMaxBoundary_succeeds() { + DurableTaskGrpcWorkerBuilder builder = new DurableTaskGrpcWorkerBuilder(); + builder.setCompleteOrchestratorResponseChunkSizeBytes( + DurableTaskGrpcWorkerBuilder.MAX_CHUNK_SIZE_BYTES); + assertEquals(DurableTaskGrpcWorkerBuilder.MAX_CHUNK_SIZE_BYTES, builder.getChunkSizeBytes()); + } + + @Test + void chunkSize_withinRange_succeeds() { + DurableTaskGrpcWorkerBuilder builder = new DurableTaskGrpcWorkerBuilder(); + int midpoint = 2_000_000; + builder.setCompleteOrchestratorResponseChunkSizeBytes(midpoint); + assertEquals(midpoint, builder.getChunkSizeBytes()); + } + + // ---- Worker capability tests (matches .NET WorkerCapabilitiesTests) ---- + + @Test + void useExternalizedPayloads_setsPayloadStore() { + DurableTaskGrpcWorkerBuilder builder = new DurableTaskGrpcWorkerBuilder(); + PayloadStore store = new InMemoryPayloadStore(); + builder.useExternalizedPayloads(store); + assertNotNull(builder.payloadStore, "payloadStore should be set after useExternalizedPayloads"); + assertNotNull(builder.largePayloadOptions, "largePayloadOptions should be set with defaults"); + } + + @Test + void useExternalizedPayloads_withOptions_setsPayloadStoreAndOptions() { + DurableTaskGrpcWorkerBuilder builder = new DurableTaskGrpcWorkerBuilder(); + PayloadStore store = new InMemoryPayloadStore(); + LargePayloadOptions options = new LargePayloadOptions.Builder() + .setThresholdBytes(500) + .setMaxExternalizedPayloadBytes(5000) + .build(); + builder.useExternalizedPayloads(store, options); + assertSame(store, builder.payloadStore); + assertSame(options, builder.largePayloadOptions); + } + + @Test + void useExternalizedPayloads_nullStore_throws() { + DurableTaskGrpcWorkerBuilder builder = new DurableTaskGrpcWorkerBuilder(); + assertThrows(IllegalArgumentException.class, + () -> builder.useExternalizedPayloads(null)); + } + + @Test + void useExternalizedPayloads_nullOptions_throws() { + DurableTaskGrpcWorkerBuilder builder = new DurableTaskGrpcWorkerBuilder(); + PayloadStore store = new InMemoryPayloadStore(); + assertThrows(IllegalArgumentException.class, + () -> builder.useExternalizedPayloads(store, null)); + } + + @Test + void withoutExternalizedPayloads_payloadStoreIsNull() { + DurableTaskGrpcWorkerBuilder builder = new DurableTaskGrpcWorkerBuilder(); + assertNull(builder.payloadStore, "payloadStore should be null by default"); + } + + @Test + void useExternalizedPayloads_preservesOtherSettings() { + DurableTaskGrpcWorkerBuilder builder = new DurableTaskGrpcWorkerBuilder(); + PayloadStore store = new InMemoryPayloadStore(); + + builder.setCompleteOrchestratorResponseChunkSizeBytes(2_000_000); + builder.useExternalizedPayloads(store); + + assertEquals(2_000_000, builder.getChunkSizeBytes(), + "Chunk size should be preserved after useExternalizedPayloads"); + assertNotNull(builder.payloadStore); + } + + @Test + void settingsAreIndependentPerBuilder() { + DurableTaskGrpcWorkerBuilder builder1 = new DurableTaskGrpcWorkerBuilder(); + DurableTaskGrpcWorkerBuilder builder2 = new DurableTaskGrpcWorkerBuilder(); + + PayloadStore store = new InMemoryPayloadStore(); + builder1.useExternalizedPayloads(store); + builder2.setCompleteOrchestratorResponseChunkSizeBytes(2_000_000); + + assertNotNull(builder1.payloadStore, "Builder1 should have payloadStore"); + assertNull(builder2.payloadStore, "Builder2 should NOT have payloadStore"); + assertEquals(DurableTaskGrpcWorkerBuilder.DEFAULT_CHUNK_SIZE_BYTES, builder1.getChunkSizeBytes(), + "Builder1 should have default chunk size"); + assertEquals(2_000_000, builder2.getChunkSizeBytes(), + "Builder2 should have custom chunk size"); + } + + /** + * Simple in-memory PayloadStore for builder-level tests. + */ + private static class InMemoryPayloadStore implements PayloadStore { + @Override + public String upload(String payload) { + return "test://token"; + } + + @Override + public String download(String token) { + return "payload"; + } + + @Override + public boolean isKnownPayloadToken(String value) { + return value != null && value.startsWith("test://"); + } + } +} diff --git a/client/src/test/java/com/microsoft/durabletask/LargePayloadIntegrationTests.java b/client/src/test/java/com/microsoft/durabletask/LargePayloadIntegrationTests.java new file mode 100644 index 00000000..f6a03eaf --- /dev/null +++ b/client/src/test/java/com/microsoft/durabletask/LargePayloadIntegrationTests.java @@ -0,0 +1,853 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package com.microsoft.durabletask; + +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.junit.jupiter.api.Assertions.*; + +/** + * Integration tests for the large payload externalization feature. + *

+ * These tests require a DTS emulator to be running on localhost:4001. + * They verify end-to-end round-trip of large payloads through externalization. + */ +@Tag("integration") +public class LargePayloadIntegrationTests extends IntegrationTestBase { + + static final Duration defaultTimeout = Duration.ofSeconds(100); + + /** + * In-memory implementation of {@link PayloadStore} for integration testing. + * Stores payloads in a thread-safe map with token format {@code test:}. + */ + static class InMemoryPayloadStore implements PayloadStore { + private static final String TOKEN_PREFIX = "test:"; + private final ConcurrentHashMap payloads = new ConcurrentHashMap<>(); + private final AtomicInteger uploadCount = new AtomicInteger(); + private final AtomicInteger downloadCount = new AtomicInteger(); + + @Override + public String upload(String payload) { + String key = TOKEN_PREFIX + uploadCount.incrementAndGet(); + payloads.put(key, payload); + return key; + } + + @Override + public String download(String token) { + downloadCount.incrementAndGet(); + String payload = payloads.get(token); + if (payload == null) { + throw new IllegalArgumentException("Unknown token: " + token); + } + return payload; + } + + @Override + public boolean isKnownPayloadToken(String value) { + return value != null && value.startsWith(TOKEN_PREFIX); + } + + int getUploadCount() { + return uploadCount.get(); + } + + int getDownloadCount() { + return downloadCount.get(); + } + } + + private static String generateLargeString(int sizeBytes) { + StringBuilder sb = new StringBuilder(sizeBytes + 2); + sb.append('"'); + for (int i = 0; i < sizeBytes - 2; i++) { + sb.append('A'); + } + sb.append('"'); + return sb.toString(); + } + + @Test + void largeOrchestrationInput_isExternalized() throws TimeoutException { + final String orchestratorName = "LargeInputOrch"; + // Create a payload larger than the default 900KB threshold + final String largeInput = generateLargeString(1_000_000); + + InMemoryPayloadStore store = new InMemoryPayloadStore(); + LargePayloadOptions options = new LargePayloadOptions.Builder().build(); + + TestDurableTaskWorkerBuilder workerBuilder = this.createWorkerBuilder(); + workerBuilder.innerBuilder.useExternalizedPayloads(store, options); + workerBuilder.addOrchestrator(orchestratorName, ctx -> { + String input = ctx.getInput(String.class); + ctx.complete(input.length()); + }); + DurableTaskGrpcWorker worker = workerBuilder.buildAndStart(); + + DurableTaskGrpcClientBuilder clientBuilder = this.createClientBuilder(); + clientBuilder.useExternalizedPayloads(store, options); + DurableTaskClient client = clientBuilder.build(); + + try (worker; client) { + String instanceId = client.scheduleNewOrchestrationInstance(orchestratorName, largeInput); + OrchestrationMetadata instance = client.waitForInstanceCompletion( + instanceId, defaultTimeout, true); + + assertNotNull(instance); + assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus()); + assertEquals(largeInput.length(), instance.readOutputAs(Integer.class)); + assertTrue(store.getUploadCount() > 0, "Should have externalized at least one payload"); + } + } + + @Test + void largeActivityOutput_isExternalized() throws TimeoutException { + final String orchestratorName = "LargeActivityOutputOrch"; + final String activityName = "GenerateLargeOutput"; + final int payloadSize = 1_000_000; + + InMemoryPayloadStore store = new InMemoryPayloadStore(); + LargePayloadOptions options = new LargePayloadOptions.Builder().build(); + + TestDurableTaskWorkerBuilder workerBuilder = this.createWorkerBuilder(); + workerBuilder.innerBuilder.useExternalizedPayloads(store, options); + workerBuilder.addOrchestrator(orchestratorName, ctx -> { + String result = ctx.callActivity(activityName, null, String.class).await(); + ctx.complete(result.length()); + }); + workerBuilder.addActivity(activityName, ctx -> { + StringBuilder sb = new StringBuilder(payloadSize); + for (int i = 0; i < payloadSize; i++) { + sb.append('B'); + } + return sb.toString(); + }); + DurableTaskGrpcWorker worker = workerBuilder.buildAndStart(); + + DurableTaskGrpcClientBuilder clientBuilder = this.createClientBuilder(); + clientBuilder.useExternalizedPayloads(store, options); + DurableTaskClient client = clientBuilder.build(); + + try (worker; client) { + String instanceId = client.scheduleNewOrchestrationInstance(orchestratorName); + OrchestrationMetadata instance = client.waitForInstanceCompletion( + instanceId, defaultTimeout, true); + + assertNotNull(instance); + assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus()); + assertEquals(payloadSize, instance.readOutputAs(Integer.class)); + assertTrue(store.getUploadCount() > 0, "Should have externalized at least one payload"); + } + } + + @Test + void smallPayload_isNotExternalized() throws TimeoutException { + final String orchestratorName = "SmallPayloadOrch"; + final String smallInput = "Hello, World!"; + + InMemoryPayloadStore store = new InMemoryPayloadStore(); + LargePayloadOptions options = new LargePayloadOptions.Builder().build(); + + TestDurableTaskWorkerBuilder workerBuilder = this.createWorkerBuilder(); + workerBuilder.innerBuilder.useExternalizedPayloads(store, options); + workerBuilder.addOrchestrator(orchestratorName, ctx -> { + ctx.complete(ctx.getInput(String.class)); + }); + DurableTaskGrpcWorker worker = workerBuilder.buildAndStart(); + + DurableTaskGrpcClientBuilder clientBuilder = this.createClientBuilder(); + clientBuilder.useExternalizedPayloads(store, options); + DurableTaskClient client = clientBuilder.build(); + + try (worker; client) { + String instanceId = client.scheduleNewOrchestrationInstance(orchestratorName, smallInput); + OrchestrationMetadata instance = client.waitForInstanceCompletion( + instanceId, defaultTimeout, true); + + assertNotNull(instance); + assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus()); + assertEquals(smallInput, instance.readOutputAs(String.class)); + assertEquals(0, store.getUploadCount(), "Small payloads should not be externalized"); + } + } + + @Test + void largeOrchestrationOutput_isExternalized() throws TimeoutException { + final String orchestratorName = "LargeOutputOrch"; + final int payloadSize = 1_000_000; + + InMemoryPayloadStore store = new InMemoryPayloadStore(); + LargePayloadOptions options = new LargePayloadOptions.Builder().build(); + + TestDurableTaskWorkerBuilder workerBuilder = this.createWorkerBuilder(); + workerBuilder.innerBuilder.useExternalizedPayloads(store, options); + workerBuilder.addOrchestrator(orchestratorName, ctx -> { + StringBuilder sb = new StringBuilder(payloadSize); + for (int i = 0; i < payloadSize; i++) { + sb.append('C'); + } + ctx.complete(sb.toString()); + }); + DurableTaskGrpcWorker worker = workerBuilder.buildAndStart(); + + DurableTaskGrpcClientBuilder clientBuilder = this.createClientBuilder(); + clientBuilder.useExternalizedPayloads(store, options); + DurableTaskClient client = clientBuilder.build(); + + try (worker; client) { + String instanceId = client.scheduleNewOrchestrationInstance(orchestratorName); + OrchestrationMetadata instance = client.waitForInstanceCompletion( + instanceId, defaultTimeout, true); + + assertNotNull(instance); + assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus()); + String output = instance.readOutputAs(String.class); + assertEquals(payloadSize, output.length()); + assertTrue(store.getUploadCount() > 0, "Large output should be externalized"); + } + } + + @Test + void largeTerminateReason_isExternalized() throws TimeoutException { + final String orchestratorName = "LargeTerminateOrch"; + // Create a reason larger than the threshold + final String largeReason = generateLargeString(1_000_000); + + InMemoryPayloadStore store = new InMemoryPayloadStore(); + LargePayloadOptions options = new LargePayloadOptions.Builder().build(); + + TestDurableTaskWorkerBuilder workerBuilder = this.createWorkerBuilder(); + workerBuilder.innerBuilder.useExternalizedPayloads(store, options); + workerBuilder.addOrchestrator(orchestratorName, ctx -> { + // Wait indefinitely — this will be terminated + ctx.createTimer(Duration.ofHours(1)).await(); + }); + DurableTaskGrpcWorker worker = workerBuilder.buildAndStart(); + + DurableTaskGrpcClientBuilder clientBuilder = this.createClientBuilder(); + clientBuilder.useExternalizedPayloads(store, options); + DurableTaskClient client = clientBuilder.build(); + + try (worker; client) { + String instanceId = client.scheduleNewOrchestrationInstance(orchestratorName); + client.waitForInstanceStart(instanceId, defaultTimeout); + + client.terminate(instanceId, largeReason); + + OrchestrationMetadata instance = client.waitForInstanceCompletion( + instanceId, defaultTimeout, true); + + assertNotNull(instance); + assertEquals(OrchestrationRuntimeStatus.TERMINATED, instance.getRuntimeStatus()); + assertTrue(store.getUploadCount() > 0, "Large terminate reason should be externalized"); + } + } + + @Test + void largeExternalEvent_isExternalized() throws TimeoutException { + final String orchestratorName = "LargeEventOrch"; + final String eventName = "MyEvent"; + final String largeEventData = generateLargeString(1_000_000); + + InMemoryPayloadStore store = new InMemoryPayloadStore(); + LargePayloadOptions options = new LargePayloadOptions.Builder().build(); + + TestDurableTaskWorkerBuilder workerBuilder = this.createWorkerBuilder(); + workerBuilder.innerBuilder.useExternalizedPayloads(store, options); + workerBuilder.addOrchestrator(orchestratorName, ctx -> { + String eventData = ctx.waitForExternalEvent(eventName, String.class).await(); + ctx.complete(eventData.length()); + }); + DurableTaskGrpcWorker worker = workerBuilder.buildAndStart(); + + DurableTaskGrpcClientBuilder clientBuilder = this.createClientBuilder(); + clientBuilder.useExternalizedPayloads(store, options); + DurableTaskClient client = clientBuilder.build(); + + try (worker; client) { + String instanceId = client.scheduleNewOrchestrationInstance(orchestratorName); + client.waitForInstanceStart(instanceId, defaultTimeout); + + client.raiseEvent(instanceId, eventName, largeEventData); + + OrchestrationMetadata instance = client.waitForInstanceCompletion( + instanceId, defaultTimeout, true); + + assertNotNull(instance); + assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus()); + assertEquals(largeEventData.length(), instance.readOutputAs(Integer.class)); + assertTrue(store.getUploadCount() > 0, "Large event data should be externalized"); + } + } + + @Test + void largeSubOrchestrationInput_isExternalized() throws TimeoutException { + final String parentOrchName = "ParentOrch"; + final String childOrchName = "ChildOrch"; + final int payloadSize = 1_000_000; + + InMemoryPayloadStore store = new InMemoryPayloadStore(); + LargePayloadOptions options = new LargePayloadOptions.Builder().build(); + + TestDurableTaskWorkerBuilder workerBuilder = this.createWorkerBuilder(); + workerBuilder.innerBuilder.useExternalizedPayloads(store, options); + workerBuilder.addOrchestrator(parentOrchName, ctx -> { + StringBuilder sb = new StringBuilder(payloadSize); + for (int i = 0; i < payloadSize; i++) { + sb.append('D'); + } + String result = ctx.callSubOrchestrator(childOrchName, sb.toString(), String.class).await(); + ctx.complete(result); + }); + workerBuilder.addOrchestrator(childOrchName, ctx -> { + String input = ctx.getInput(String.class); + ctx.complete("length=" + input.length()); + }); + DurableTaskGrpcWorker worker = workerBuilder.buildAndStart(); + + DurableTaskGrpcClientBuilder clientBuilder = this.createClientBuilder(); + clientBuilder.useExternalizedPayloads(store, options); + DurableTaskClient client = clientBuilder.build(); + + try (worker; client) { + String instanceId = client.scheduleNewOrchestrationInstance(parentOrchName); + OrchestrationMetadata instance = client.waitForInstanceCompletion( + instanceId, defaultTimeout, true); + + assertNotNull(instance); + assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus()); + assertEquals("length=" + payloadSize, instance.readOutputAs(String.class)); + assertTrue(store.getUploadCount() > 0, "Large sub-orchestration payload should be externalized"); + } + } + + @Test + void largeActivityInput_isExternalized() throws TimeoutException { + final String orchestratorName = "LargeActivityInputOrch"; + final String activityName = "ProcessLargeInput"; + final int payloadSize = 1_000_000; + + InMemoryPayloadStore store = new InMemoryPayloadStore(); + LargePayloadOptions options = new LargePayloadOptions.Builder().build(); + + TestDurableTaskWorkerBuilder workerBuilder = this.createWorkerBuilder(); + workerBuilder.innerBuilder.useExternalizedPayloads(store, options); + workerBuilder.addOrchestrator(orchestratorName, ctx -> { + StringBuilder sb = new StringBuilder(payloadSize); + for (int i = 0; i < payloadSize; i++) { + sb.append('E'); + } + Integer result = ctx.callActivity(activityName, sb.toString(), Integer.class).await(); + ctx.complete(result); + }); + workerBuilder.addActivity(activityName, ctx -> { + String input = ctx.getInput(String.class); + return input.length(); + }); + DurableTaskGrpcWorker worker = workerBuilder.buildAndStart(); + + DurableTaskGrpcClientBuilder clientBuilder = this.createClientBuilder(); + clientBuilder.useExternalizedPayloads(store, options); + DurableTaskClient client = clientBuilder.build(); + + try (worker; client) { + String instanceId = client.scheduleNewOrchestrationInstance(orchestratorName); + OrchestrationMetadata instance = client.waitForInstanceCompletion( + instanceId, defaultTimeout, true); + + assertNotNull(instance); + assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus()); + assertEquals(payloadSize, instance.readOutputAs(Integer.class)); + assertTrue(store.getUploadCount() > 0, "Large activity input should be externalized"); + } + } + + @Test + void continueAsNew_withLargeInput_isExternalized() throws TimeoutException { + final String orchestratorName = "ContinueAsNewOrch"; + final int maxIterations = 3; + + InMemoryPayloadStore store = new InMemoryPayloadStore(); + LargePayloadOptions options = new LargePayloadOptions.Builder().build(); + + TestDurableTaskWorkerBuilder workerBuilder = this.createWorkerBuilder(); + workerBuilder.innerBuilder.useExternalizedPayloads(store, options); + workerBuilder.addOrchestrator(orchestratorName, ctx -> { + int iteration = ctx.getInput(Integer.class); + if (iteration >= maxIterations) { + ctx.complete("done-" + iteration); + return; + } + ctx.continueAsNew(iteration + 1); + }); + DurableTaskGrpcWorker worker = workerBuilder.buildAndStart(); + + DurableTaskGrpcClientBuilder clientBuilder = this.createClientBuilder(); + clientBuilder.useExternalizedPayloads(store, options); + DurableTaskClient client = clientBuilder.build(); + + try (worker; client) { + String instanceId = client.scheduleNewOrchestrationInstance(orchestratorName, 1); + OrchestrationMetadata instance = client.waitForInstanceCompletion( + instanceId, defaultTimeout, true); + + assertNotNull(instance); + assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus()); + assertEquals("done-" + maxIterations, instance.readOutputAs(String.class)); + } + } + + @Test + void queryInstances_resolvesLargePayloads() throws TimeoutException { + final String orchestratorName = "QueryLargePayloadOrch"; + final int payloadSize = 1_000_000; + + InMemoryPayloadStore store = new InMemoryPayloadStore(); + LargePayloadOptions options = new LargePayloadOptions.Builder().build(); + + TestDurableTaskWorkerBuilder workerBuilder = this.createWorkerBuilder(); + workerBuilder.innerBuilder.useExternalizedPayloads(store, options); + workerBuilder.addOrchestrator(orchestratorName, ctx -> { + StringBuilder sb = new StringBuilder(payloadSize); + for (int i = 0; i < payloadSize; i++) { + sb.append('F'); + } + ctx.complete(sb.toString()); + }); + DurableTaskGrpcWorker worker = workerBuilder.buildAndStart(); + + DurableTaskGrpcClientBuilder clientBuilder = this.createClientBuilder(); + clientBuilder.useExternalizedPayloads(store, options); + DurableTaskClient client = clientBuilder.build(); + + try (worker; client) { + String instanceId = client.scheduleNewOrchestrationInstance(orchestratorName); + OrchestrationMetadata completedInstance = client.waitForInstanceCompletion( + instanceId, defaultTimeout, true); + assertNotNull(completedInstance); + assertEquals(OrchestrationRuntimeStatus.COMPLETED, completedInstance.getRuntimeStatus()); + + // Query the instance to verify payload resolution + OrchestrationMetadata queried = client.getInstanceMetadata(instanceId, true); + assertNotNull(queried); + String output = queried.readOutputAs(String.class); + assertEquals(payloadSize, output.length()); + } + } + + @Test + void suspendAndResume_withLargeReason_works() throws TimeoutException { + final String orchestratorName = "SuspendResumeOrch"; + final String largeReason = generateLargeString(1_000_000); + + InMemoryPayloadStore store = new InMemoryPayloadStore(); + LargePayloadOptions options = new LargePayloadOptions.Builder().build(); + + TestDurableTaskWorkerBuilder workerBuilder = this.createWorkerBuilder(); + workerBuilder.innerBuilder.useExternalizedPayloads(store, options); + workerBuilder.addOrchestrator(orchestratorName, ctx -> { + ctx.waitForExternalEvent("continue").await(); + ctx.complete("resumed"); + }); + DurableTaskGrpcWorker worker = workerBuilder.buildAndStart(); + + DurableTaskGrpcClientBuilder clientBuilder = this.createClientBuilder(); + clientBuilder.useExternalizedPayloads(store, options); + DurableTaskClient client = clientBuilder.build(); + + try (worker; client) { + String instanceId = client.scheduleNewOrchestrationInstance(orchestratorName); + client.waitForInstanceStart(instanceId, defaultTimeout); + + // Suspend with large reason + client.suspendInstance(instanceId, largeReason); + Thread.sleep(2000); // allow time for suspend to take effect + + OrchestrationMetadata suspended = client.getInstanceMetadata(instanceId, false); + assertNotNull(suspended); + assertEquals(OrchestrationRuntimeStatus.SUSPENDED, suspended.getRuntimeStatus()); + + // Resume with large reason + client.resumeInstance(instanceId, largeReason); + Thread.sleep(2000); + + // Send event to complete + client.raiseEvent(instanceId, "continue", null); + + OrchestrationMetadata instance = client.waitForInstanceCompletion( + instanceId, defaultTimeout, true); + assertNotNull(instance); + assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus()); + assertTrue(store.getUploadCount() > 0, "Large suspend/resume reasons should be externalized"); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + fail("Test interrupted"); + } + } + + // ---- Autochunk tests (matches .NET AutochunkTests) ---- + + @Test + void autochunk_multipleChunks_completesSuccessfully() throws TimeoutException { + final String orchestratorName = "AutochunkMultipleOrch"; + final String activityName = "GeneratePayload"; + // Use 36 activities, each returning ~30KB. At 1MB chunk size this forces multiple chunks. + final int activityCount = 36; + final int payloadSizePerActivity = 30_000; + + InMemoryPayloadStore store = new InMemoryPayloadStore(); + LargePayloadOptions options = new LargePayloadOptions.Builder().build(); + + TestDurableTaskWorkerBuilder workerBuilder = this.createWorkerBuilder(); + workerBuilder.innerBuilder.useExternalizedPayloads(store, options); + // Set a small chunk size to force chunking + workerBuilder.innerBuilder.setCompleteOrchestratorResponseChunkSizeBytes( + DurableTaskGrpcWorkerBuilder.MIN_CHUNK_SIZE_BYTES); // 1 MiB + workerBuilder.addOrchestrator(orchestratorName, ctx -> { + List> tasks = IntStream.range(0, activityCount) + .mapToObj(i -> ctx.callActivity(activityName, i, String.class)) + .collect(Collectors.toList()); + ctx.allOf(tasks).await(); + ctx.complete(activityCount); + }); + workerBuilder.addActivity(activityName, ctx -> { + StringBuilder sb = new StringBuilder(payloadSizePerActivity); + for (int i = 0; i < payloadSizePerActivity; i++) { + sb.append('X'); + } + return sb.toString(); + }); + DurableTaskGrpcWorker worker = workerBuilder.buildAndStart(); + + DurableTaskGrpcClientBuilder clientBuilder = this.createClientBuilder(); + clientBuilder.useExternalizedPayloads(store, options); + DurableTaskClient client = clientBuilder.build(); + + try (worker; client) { + String instanceId = client.scheduleNewOrchestrationInstance(orchestratorName); + OrchestrationMetadata instance = client.waitForInstanceCompletion( + instanceId, defaultTimeout, true); + + assertNotNull(instance); + assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus()); + assertEquals(activityCount, instance.readOutputAs(Integer.class)); + } + } + + @Test + void autochunk_mixedActions_completesSuccessfully() throws TimeoutException { + final String orchestratorName = "AutochunkMixedOrch"; + final String activityName = "MixedActivity"; + final String childOrchName = "MixedChildOrch"; + // 30 activities + many timers + sub-orchestrations + final int activityCount = 30; + final int timerCount = 20; + final int subOrchCount = 10; + + InMemoryPayloadStore store = new InMemoryPayloadStore(); + LargePayloadOptions options = new LargePayloadOptions.Builder().build(); + + TestDurableTaskWorkerBuilder workerBuilder = this.createWorkerBuilder(); + workerBuilder.innerBuilder.useExternalizedPayloads(store, options); + workerBuilder.innerBuilder.setCompleteOrchestratorResponseChunkSizeBytes( + DurableTaskGrpcWorkerBuilder.MIN_CHUNK_SIZE_BYTES); + workerBuilder.addOrchestrator(orchestratorName, ctx -> { + int total = 0; + + // Schedule activities + List> activityTasks = IntStream.range(0, activityCount) + .mapToObj(i -> ctx.callActivity(activityName, i, Integer.class)) + .collect(Collectors.toList()); + + // Schedule timers + for (int i = 0; i < timerCount; i++) { + ctx.createTimer(Duration.ofMillis(1)).await(); + } + + // Schedule sub-orchestrations + List> subOrchTasks = IntStream.range(0, subOrchCount) + .mapToObj(i -> ctx.callSubOrchestrator(childOrchName, i, Integer.class)) + .collect(Collectors.toList()); + + // Await all activities + ctx.allOf(activityTasks).await(); + for (Task t : activityTasks) { + total += t.await(); + } + + // Await all sub-orchestrations + ctx.allOf(subOrchTasks).await(); + for (Task t : subOrchTasks) { + total += t.await(); + } + + ctx.complete(total); + }); + workerBuilder.addOrchestrator(childOrchName, ctx -> { + int input = ctx.getInput(Integer.class); + ctx.complete(input); + }); + workerBuilder.addActivity(activityName, ctx -> { + return ctx.getInput(Integer.class); + }); + DurableTaskGrpcWorker worker = workerBuilder.buildAndStart(); + + DurableTaskGrpcClientBuilder clientBuilder = this.createClientBuilder(); + clientBuilder.useExternalizedPayloads(store, options); + DurableTaskClient client = clientBuilder.build(); + + try (worker; client) { + String instanceId = client.scheduleNewOrchestrationInstance(orchestratorName); + OrchestrationMetadata instance = client.waitForInstanceCompletion( + instanceId, defaultTimeout, true); + + assertNotNull(instance); + assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus()); + + int expectedTotal = 0; + for (int i = 0; i < activityCount; i++) expectedTotal += i; + for (int i = 0; i < subOrchCount; i++) expectedTotal += i; + assertEquals(expectedTotal, instance.readOutputAs(Integer.class)); + } + } + + @Test + void autochunk_singleActionExceedsChunkSize_failsWithClearError() throws TimeoutException { + final String orchestratorName = "AutochunkOversizedOrch"; + // Create an orchestrator that completes with a payload larger than 1MB chunk size. + // Externalization is NOT configured so the large payload stays inline in the + // CompleteOrchestration action, which exceeds the chunk size. + final int payloadSize = 1_200_000; + + TestDurableTaskWorkerBuilder workerBuilder = this.createWorkerBuilder(); + workerBuilder.innerBuilder.setCompleteOrchestratorResponseChunkSizeBytes( + DurableTaskGrpcWorkerBuilder.MIN_CHUNK_SIZE_BYTES); + workerBuilder.addOrchestrator(orchestratorName, ctx -> { + StringBuilder sb = new StringBuilder(payloadSize); + for (int i = 0; i < payloadSize; i++) { + sb.append('Z'); + } + ctx.complete(sb.toString()); + }); + DurableTaskGrpcWorker worker = workerBuilder.buildAndStart(); + + DurableTaskGrpcClientBuilder clientBuilder = this.createClientBuilder(); + DurableTaskClient client = clientBuilder.build(); + + try (worker; client) { + String instanceId = client.scheduleNewOrchestrationInstance(orchestratorName); + OrchestrationMetadata instance = client.waitForInstanceCompletion( + instanceId, defaultTimeout, true); + + assertNotNull(instance); + // The orchestration should fail because a single action exceeds the chunk size + assertEquals(OrchestrationRuntimeStatus.FAILED, instance.getRuntimeStatus()); + } + } + + // ---- Combined scenario tests (matches .NET combined tests) ---- + + @Test + void largeInputOutputAndCustomStatus_allExternalized() throws TimeoutException { + final String orchestratorName = "LargeAllFieldsOrch"; + final int payloadSize = 1_000_000; + + InMemoryPayloadStore store = new InMemoryPayloadStore(); + LargePayloadOptions options = new LargePayloadOptions.Builder().build(); + + TestDurableTaskWorkerBuilder workerBuilder = this.createWorkerBuilder(); + workerBuilder.innerBuilder.useExternalizedPayloads(store, options); + workerBuilder.addOrchestrator(orchestratorName, ctx -> { + String input = ctx.getInput(String.class); + + // Set a large custom status + StringBuilder customStatus = new StringBuilder(payloadSize); + for (int i = 0; i < payloadSize; i++) { + customStatus.append('S'); + } + ctx.setCustomStatus(customStatus.toString()); + + // Return large output + ctx.complete(input + input); + }); + DurableTaskGrpcWorker worker = workerBuilder.buildAndStart(); + + DurableTaskGrpcClientBuilder clientBuilder = this.createClientBuilder(); + clientBuilder.useExternalizedPayloads(store, options); + DurableTaskClient client = clientBuilder.build(); + + try (worker; client) { + String largeInput = generateLargeString(payloadSize); + String instanceId = client.scheduleNewOrchestrationInstance(orchestratorName, largeInput); + OrchestrationMetadata instance = client.waitForInstanceCompletion( + instanceId, defaultTimeout, true); + + assertNotNull(instance); + assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus()); + String output = instance.readOutputAs(String.class); + assertEquals(largeInput + largeInput, output); + assertTrue(store.getUploadCount() >= 1, "Should have externalized input, output, and/or custom status"); + assertTrue(store.getDownloadCount() >= 1, "Should have resolved at least one payload"); + } + } + + @Test + void continueAsNew_withLargeCustomStatusAndFinalOutput() throws TimeoutException { + final String orchestratorName = "ContinueAsNewAllOrch"; + final int payloadSize = 1_000_000; + final int iterations = 3; + + InMemoryPayloadStore store = new InMemoryPayloadStore(); + LargePayloadOptions options = new LargePayloadOptions.Builder().build(); + + TestDurableTaskWorkerBuilder workerBuilder = this.createWorkerBuilder(); + workerBuilder.innerBuilder.useExternalizedPayloads(store, options); + workerBuilder.addOrchestrator(orchestratorName, ctx -> { + int iteration = ctx.getInput(Integer.class); + + // Set large custom status on every iteration + StringBuilder status = new StringBuilder(payloadSize); + for (int i = 0; i < payloadSize; i++) { + status.append((char) ('A' + (iteration % 26))); + } + ctx.setCustomStatus(status.toString()); + + if (iteration >= iterations) { + // Large final output + StringBuilder finalOutput = new StringBuilder(payloadSize); + for (int i = 0; i < payloadSize; i++) { + finalOutput.append('F'); + } + ctx.complete(finalOutput.toString()); + return; + } + ctx.continueAsNew(iteration + 1); + }); + DurableTaskGrpcWorker worker = workerBuilder.buildAndStart(); + + DurableTaskGrpcClientBuilder clientBuilder = this.createClientBuilder(); + clientBuilder.useExternalizedPayloads(store, options); + DurableTaskClient client = clientBuilder.build(); + + try (worker; client) { + String instanceId = client.scheduleNewOrchestrationInstance(orchestratorName, 1); + OrchestrationMetadata instance = client.waitForInstanceCompletion( + instanceId, defaultTimeout, true); + + assertNotNull(instance); + assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus()); + String output = instance.readOutputAs(String.class); + assertEquals(payloadSize, output.length()); + assertTrue(store.getUploadCount() > 0, "Should have externalized custom status and/or final output"); + } + } + + @Test + void largeSubOrchestrationAndActivityOutput_combined() throws TimeoutException { + final String parentOrchName = "CombinedParentOrch"; + final String childOrchName = "CombinedChildOrch"; + final String activityName = "CombinedActivity"; + final int subOrchPayloadSize = 1_000_000; + final int activityPayloadSize = 1_000_000; + + InMemoryPayloadStore store = new InMemoryPayloadStore(); + LargePayloadOptions options = new LargePayloadOptions.Builder().build(); + + TestDurableTaskWorkerBuilder workerBuilder = this.createWorkerBuilder(); + workerBuilder.innerBuilder.useExternalizedPayloads(store, options); + workerBuilder.addOrchestrator(parentOrchName, ctx -> { + // Large sub-orchestration input + StringBuilder subInput = new StringBuilder(subOrchPayloadSize); + for (int i = 0; i < subOrchPayloadSize; i++) { + subInput.append('G'); + } + String subResult = ctx.callSubOrchestrator(childOrchName, subInput.toString(), String.class).await(); + + // Activity that returns large output + String actResult = ctx.callActivity(activityName, null, String.class).await(); + + ctx.complete(subResult.length() + actResult.length()); + }); + workerBuilder.addOrchestrator(childOrchName, ctx -> { + String input = ctx.getInput(String.class); + ctx.complete("child-" + input.length()); + }); + workerBuilder.addActivity(activityName, ctx -> { + StringBuilder sb = new StringBuilder(activityPayloadSize); + for (int i = 0; i < activityPayloadSize; i++) { + sb.append('H'); + } + return sb.toString(); + }); + DurableTaskGrpcWorker worker = workerBuilder.buildAndStart(); + + DurableTaskGrpcClientBuilder clientBuilder = this.createClientBuilder(); + clientBuilder.useExternalizedPayloads(store, options); + DurableTaskClient client = clientBuilder.build(); + + try (worker; client) { + String instanceId = client.scheduleNewOrchestrationInstance(parentOrchName); + OrchestrationMetadata instance = client.waitForInstanceCompletion( + instanceId, defaultTimeout, true); + + assertNotNull(instance); + assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus()); + int expectedLength = ("child-" + subOrchPayloadSize).length() + activityPayloadSize; + assertEquals(expectedLength, instance.readOutputAs(Integer.class)); + assertTrue(store.getUploadCount() >= 1, "Should have externalized large payloads"); + assertTrue(store.getDownloadCount() >= 1, "Should have resolved large payloads"); + } + } + + // ---- Max payload rejection integration test ---- + + @Test + void exceedingMaxPayload_isRejected() throws TimeoutException { + final String orchestratorName = "MaxPayloadRejectionOrch"; + // Use a very small max so we can trigger rejection without massive strings + final int threshold = 100; + final int maxPayload = 200; + + InMemoryPayloadStore store = new InMemoryPayloadStore(); + LargePayloadOptions options = new LargePayloadOptions.Builder() + .setThresholdBytes(threshold) + .setMaxExternalizedPayloadBytes(maxPayload) + .build(); + + TestDurableTaskWorkerBuilder workerBuilder = this.createWorkerBuilder(); + workerBuilder.innerBuilder.useExternalizedPayloads(store, options); + workerBuilder.addOrchestrator(orchestratorName, ctx -> { + // Generate output that exceeds max externalized payload size + StringBuilder sb = new StringBuilder(500); + for (int i = 0; i < 500; i++) { + sb.append('R'); + } + ctx.complete(sb.toString()); + }); + DurableTaskGrpcWorker worker = workerBuilder.buildAndStart(); + + DurableTaskGrpcClientBuilder clientBuilder = this.createClientBuilder(); + clientBuilder.useExternalizedPayloads(store, options); + DurableTaskClient client = clientBuilder.build(); + + try (worker; client) { + String instanceId = client.scheduleNewOrchestrationInstance(orchestratorName); + OrchestrationMetadata instance = client.waitForInstanceCompletion( + instanceId, defaultTimeout, true); + + assertNotNull(instance); + // The orchestration should fail because the payload exceeds max + assertEquals(OrchestrationRuntimeStatus.FAILED, instance.getRuntimeStatus()); + } + } +} diff --git a/client/src/test/java/com/microsoft/durabletask/LargePayloadOptionsTest.java b/client/src/test/java/com/microsoft/durabletask/LargePayloadOptionsTest.java new file mode 100644 index 00000000..ca9be95b --- /dev/null +++ b/client/src/test/java/com/microsoft/durabletask/LargePayloadOptionsTest.java @@ -0,0 +1,67 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.microsoft.durabletask; + +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.*; + +/** + * Unit tests for LargePayloadOptions. + */ +public class LargePayloadOptionsTest { + + @Test + void defaults_matchExpectedValues() { + LargePayloadOptions options = new LargePayloadOptions.Builder().build(); + assertEquals(900_000, options.getThresholdBytes()); + assertEquals(10 * 1024 * 1024, options.getMaxExternalizedPayloadBytes()); + } + + @Test + void customValues_areRespected() { + LargePayloadOptions options = new LargePayloadOptions.Builder() + .setThresholdBytes(500) + .setMaxExternalizedPayloadBytes(5000) + .build(); + assertEquals(500, options.getThresholdBytes()); + assertEquals(5000, options.getMaxExternalizedPayloadBytes()); + } + + @Test + void negativeThreshold_throws() { + assertThrows(IllegalArgumentException.class, + () -> new LargePayloadOptions.Builder().setThresholdBytes(-1)); + } + + @Test + void thresholdExceeds1MiB_throws() { + assertThrows(IllegalArgumentException.class, + () -> new LargePayloadOptions.Builder().setThresholdBytes(1_048_577)); + } + + @Test + void nonPositiveMax_throws() { + assertThrows(IllegalArgumentException.class, + () -> new LargePayloadOptions.Builder().setMaxExternalizedPayloadBytes(0)); + } + + @Test + void thresholdEqualToMax_throws() { + assertThrows(IllegalStateException.class, + () -> new LargePayloadOptions.Builder() + .setThresholdBytes(100) + .setMaxExternalizedPayloadBytes(100) + .build()); + } + + @Test + void thresholdGreaterThanMax_throws() { + assertThrows(IllegalStateException.class, + () -> new LargePayloadOptions.Builder() + .setThresholdBytes(200) + .setMaxExternalizedPayloadBytes(100) + .build()); + } +} diff --git a/client/src/test/java/com/microsoft/durabletask/OrchestrationRunnerPayloadTest.java b/client/src/test/java/com/microsoft/durabletask/OrchestrationRunnerPayloadTest.java new file mode 100644 index 00000000..29daf4ad --- /dev/null +++ b/client/src/test/java/com/microsoft/durabletask/OrchestrationRunnerPayloadTest.java @@ -0,0 +1,175 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.microsoft.durabletask; + +import com.google.protobuf.StringValue; +import com.google.protobuf.Timestamp; +import com.microsoft.durabletask.implementation.protobuf.OrchestratorService.*; +import org.junit.jupiter.api.Test; + +import java.util.Base64; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; + +import static org.junit.jupiter.api.Assertions.*; + +/** + * Tests for OrchestrationRunner with large payload externalization. + */ +public class OrchestrationRunnerPayloadTest { + + private static final String TOKEN_PREFIX = "test://payload/"; + + private static class TestPayloadStore implements PayloadStore { + private final Map blobs = new HashMap<>(); + + @Override + public String upload(String payload) { + String token = TOKEN_PREFIX + UUID.randomUUID().toString(); + blobs.put(token, payload); + return token; + } + + @Override + public String download(String token) { + String value = blobs.get(token); + if (value == null) { + throw new IllegalArgumentException("Unknown token: " + token); + } + return value; + } + + @Override + public boolean isKnownPayloadToken(String value) { + return value != null && value.startsWith(TOKEN_PREFIX); + } + + void seed(String token, String payload) { + blobs.put(token, payload); + } + } + + @Test + void loadAndRun_withoutStore_worksNormally() { + // Build a minimal orchestration request + byte[] requestBytes = buildSimpleOrchestrationRequest("test-input"); + + byte[] resultBytes = OrchestrationRunner.loadAndRun(requestBytes, ctx -> { + String input = ctx.getInput(String.class); + ctx.complete("output: " + input); + }); + + assertNotNull(resultBytes); + assertTrue(resultBytes.length > 0); + } + + @Test + void loadAndRun_withStore_resolvesInputAndExternalizes() { + TestPayloadStore store = new TestPayloadStore(); + LargePayloadOptions options = new LargePayloadOptions.Builder() + .setThresholdBytes(10) + .setMaxExternalizedPayloadBytes(100_000) + .build(); + + // Seed a token for the input so it gets resolved + String largeInput = "\"large-input-data-exceeding-threshold-for-test\""; + String inputToken = TOKEN_PREFIX + "input-token"; + store.seed(inputToken, largeInput); + + // Build request with token as the input + byte[] requestBytes = buildSimpleOrchestrationRequest(inputToken); + + byte[] resultBytes = OrchestrationRunner.loadAndRun(requestBytes, ctx -> { + // The token should have been resolved before the orchestration runs + String input = ctx.getInput(String.class); + assertEquals("large-input-data-exceeding-threshold-for-test", input); + ctx.complete("done"); + }, store, options); + + assertNotNull(resultBytes); + assertTrue(resultBytes.length > 0); + + // Parse the response to verify it was externalized + OrchestratorResponse response; + try { + response = OrchestratorResponse.parseFrom(resultBytes); + } catch (Exception e) { + throw new RuntimeException(e); + } + + // The output "done" (6 bytes including quotes) is below our 10 byte threshold, + // so it should NOT be externalized. This verifies the threshold logic works. + OrchestratorAction completeAction = response.getActionsList().stream() + .filter(OrchestratorAction::hasCompleteOrchestration) + .findFirst() + .orElseThrow(() -> new AssertionError("Expected CompleteOrchestration action")); + + String resultValue = completeAction.getCompleteOrchestration().getResult().getValue(); + assertFalse(store.isKnownPayloadToken(resultValue), + "Small result should not be externalized"); + } + + @Test + void loadAndRun_base64_withStore_works() { + TestPayloadStore store = new TestPayloadStore(); + + byte[] requestBytes = buildSimpleOrchestrationRequest("\"hello\""); + String base64 = Base64.getEncoder().encodeToString(requestBytes); + + String resultBase64 = OrchestrationRunner.loadAndRun(base64, ctx -> { + ctx.complete("world"); + }, store); + + assertNotNull(resultBase64); + assertFalse(resultBase64.isEmpty()); + } + + @Test + void loadAndRun_nullStore_treatedAsNoExternalization() { + byte[] requestBytes = buildSimpleOrchestrationRequest("\"test\""); + + byte[] resultBytes = OrchestrationRunner.loadAndRun(requestBytes, ctx -> { + ctx.complete("result"); + }, null, null); + + assertNotNull(resultBytes); + assertTrue(resultBytes.length > 0); + } + + private byte[] buildSimpleOrchestrationRequest(String input) { + HistoryEvent orchestratorStarted = HistoryEvent.newBuilder() + .setEventId(-1) + .setTimestamp(Timestamp.getDefaultInstance()) + .setOrchestratorStarted(OrchestratorStartedEvent.getDefaultInstance()) + .build(); + + HistoryEvent executionStarted = HistoryEvent.newBuilder() + .setEventId(-1) + .setTimestamp(Timestamp.getDefaultInstance()) + .setExecutionStarted(ExecutionStartedEvent.newBuilder() + .setName("TestOrchestration") + .setVersion(StringValue.of("")) + .setInput(StringValue.of(input)) + .setOrchestrationInstance(OrchestrationInstance.newBuilder() + .setInstanceId("test-" + UUID.randomUUID()) + .build()) + .build()) + .build(); + + HistoryEvent orchestratorCompleted = HistoryEvent.newBuilder() + .setEventId(-1) + .setTimestamp(Timestamp.getDefaultInstance()) + .setOrchestratorCompleted(OrchestratorCompletedEvent.getDefaultInstance()) + .build(); + + return OrchestratorRequest.newBuilder() + .setInstanceId("test-" + UUID.randomUUID()) + .addNewEvents(orchestratorStarted) + .addNewEvents(executionStarted) + .addNewEvents(orchestratorCompleted) + .build() + .toByteArray(); + } +} diff --git a/client/src/test/java/com/microsoft/durabletask/PayloadHelperTest.java b/client/src/test/java/com/microsoft/durabletask/PayloadHelperTest.java new file mode 100644 index 00000000..117825cf --- /dev/null +++ b/client/src/test/java/com/microsoft/durabletask/PayloadHelperTest.java @@ -0,0 +1,180 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.microsoft.durabletask; + +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.*; + +/** + * Unit tests for PayloadHelper. + */ +public class PayloadHelperTest { + + /** + * A simple in-memory PayloadStore for testing. + */ + private static class InMemoryPayloadStore implements PayloadStore { + private static final String TOKEN_PREFIX = "inmemory://"; + private final java.util.Map blobs = new java.util.HashMap<>(); + private int uploadCount = 0; + private int downloadCount = 0; + + @Override + public String upload(String payload) { + uploadCount++; + String token = TOKEN_PREFIX + java.util.UUID.randomUUID().toString(); + blobs.put(token, payload); + return token; + } + + @Override + public String download(String token) { + downloadCount++; + String value = blobs.get(token); + if (value == null) { + throw new IllegalArgumentException("Unknown token: " + token); + } + return value; + } + + @Override + public boolean isKnownPayloadToken(String value) { + return value != null && value.startsWith(TOKEN_PREFIX); + } + + int getUploadCount() { return uploadCount; } + int getDownloadCount() { return downloadCount; } + } + + @Test + void maybeExternalize_nullValue_returnsNull() { + InMemoryPayloadStore store = new InMemoryPayloadStore(); + PayloadHelper helper = new PayloadHelper(store, defaultOptions()); + assertNull(helper.maybeExternalize(null)); + assertEquals(0, store.getUploadCount()); + } + + @Test + void maybeExternalize_emptyValue_returnsEmpty() { + InMemoryPayloadStore store = new InMemoryPayloadStore(); + PayloadHelper helper = new PayloadHelper(store, defaultOptions()); + assertEquals("", helper.maybeExternalize("")); + assertEquals(0, store.getUploadCount()); + } + + @Test + void maybeExternalize_belowThreshold_returnsOriginal() { + InMemoryPayloadStore store = new InMemoryPayloadStore(); + LargePayloadOptions options = new LargePayloadOptions.Builder() + .setThresholdBytes(100) + .setMaxExternalizedPayloadBytes(1000) + .build(); + PayloadHelper helper = new PayloadHelper(store, options); + + String smallPayload = "hello"; + assertEquals(smallPayload, helper.maybeExternalize(smallPayload)); + assertEquals(0, store.getUploadCount()); + } + + @Test + void maybeExternalize_aboveThreshold_uploadsAndReturnsToken() { + InMemoryPayloadStore store = new InMemoryPayloadStore(); + LargePayloadOptions options = new LargePayloadOptions.Builder() + .setThresholdBytes(10) + .setMaxExternalizedPayloadBytes(10_000) + .build(); + PayloadHelper helper = new PayloadHelper(store, options); + + String largePayload = "a]".repeat(100); // 200 chars + String result = helper.maybeExternalize(largePayload); + + assertNotEquals(largePayload, result); + assertTrue(store.isKnownPayloadToken(result)); + assertEquals(1, store.getUploadCount()); + } + + @Test + void maybeExternalize_exceedsMaxCap_throwsException() { + InMemoryPayloadStore store = new InMemoryPayloadStore(); + LargePayloadOptions options = new LargePayloadOptions.Builder() + .setThresholdBytes(10) + .setMaxExternalizedPayloadBytes(50) + .build(); + PayloadHelper helper = new PayloadHelper(store, options); + + // Create a payload larger than 50 bytes + String hugePayload = "x".repeat(100); + assertThrows(PayloadTooLargeException.class, () -> helper.maybeExternalize(hugePayload)); + assertEquals(0, store.getUploadCount()); + } + + @Test + void maybeResolve_nullValue_returnsNull() { + InMemoryPayloadStore store = new InMemoryPayloadStore(); + PayloadHelper helper = new PayloadHelper(store, defaultOptions()); + assertNull(helper.maybeResolve(null)); + assertEquals(0, store.getDownloadCount()); + } + + @Test + void maybeResolve_regularValue_returnsOriginal() { + InMemoryPayloadStore store = new InMemoryPayloadStore(); + PayloadHelper helper = new PayloadHelper(store, defaultOptions()); + String regularValue = "just some data"; + assertEquals(regularValue, helper.maybeResolve(regularValue)); + assertEquals(0, store.getDownloadCount()); + } + + @Test + void maybeResolve_knownToken_downloadsAndReturnsPayload() { + InMemoryPayloadStore store = new InMemoryPayloadStore(); + LargePayloadOptions options = new LargePayloadOptions.Builder() + .setThresholdBytes(10) + .setMaxExternalizedPayloadBytes(10_000) + .build(); + PayloadHelper helper = new PayloadHelper(store, options); + + // First upload, then resolve + String originalPayload = "x".repeat(100); + String token = helper.maybeExternalize(originalPayload); + String resolved = helper.maybeResolve(token); + + assertEquals(originalPayload, resolved); + assertEquals(1, store.getDownloadCount()); + } + + @Test + void roundTrip_externalizeAndResolve() { + InMemoryPayloadStore store = new InMemoryPayloadStore(); + LargePayloadOptions options = new LargePayloadOptions.Builder() + .setThresholdBytes(5) + .setMaxExternalizedPayloadBytes(10_000) + .build(); + PayloadHelper helper = new PayloadHelper(store, options); + + String payload = "This is a test payload that is long enough"; + String token = helper.maybeExternalize(payload); + assertNotEquals(payload, token); + + String resolved = helper.maybeResolve(token); + assertEquals(payload, resolved); + } + + @Test + void constructor_nullStore_throws() { + assertThrows(IllegalArgumentException.class, + () -> new PayloadHelper(null, defaultOptions())); + } + + @Test + void constructor_nullOptions_throws() { + assertThrows(IllegalArgumentException.class, + () -> new PayloadHelper(new InMemoryPayloadStore(), null)); + } + + private static LargePayloadOptions defaultOptions() { + return new LargePayloadOptions.Builder().build(); + } +} diff --git a/client/src/test/java/com/microsoft/durabletask/PayloadInterceptionHelperTest.java b/client/src/test/java/com/microsoft/durabletask/PayloadInterceptionHelperTest.java new file mode 100644 index 00000000..bb2b0ce3 --- /dev/null +++ b/client/src/test/java/com/microsoft/durabletask/PayloadInterceptionHelperTest.java @@ -0,0 +1,328 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.microsoft.durabletask; + +import com.google.protobuf.StringValue; +import com.microsoft.durabletask.implementation.protobuf.OrchestratorService.*; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; + +import static org.junit.jupiter.api.Assertions.*; + +/** + * Unit tests for PayloadInterceptionHelper. + */ +public class PayloadInterceptionHelperTest { + + private static final String TOKEN_PREFIX = "test://token/"; + + /** + * A simple in-memory PayloadStore for testing. + */ + private static class TestPayloadStore implements PayloadStore { + private final Map blobs = new HashMap<>(); + + @Override + public String upload(String payload) { + String token = TOKEN_PREFIX + UUID.randomUUID().toString(); + blobs.put(token, payload); + return token; + } + + @Override + public String download(String token) { + String value = blobs.get(token); + if (value == null) { + throw new IllegalArgumentException("Unknown token: " + token); + } + return value; + } + + @Override + public boolean isKnownPayloadToken(String value) { + return value != null && value.startsWith(TOKEN_PREFIX); + } + + /** Pre-load a token->payload mapping for resolution tests. */ + void seed(String token, String payload) { + blobs.put(token, payload); + } + } + + private PayloadHelper createHelper(TestPayloadStore store) { + LargePayloadOptions options = new LargePayloadOptions.Builder() + .setThresholdBytes(10) + .setMaxExternalizedPayloadBytes(100_000) + .build(); + return new PayloadHelper(store, options); + } + + // ---- Resolve tests ---- + + @Test + void resolveOrchestratorRequest_noTokens_returnsOriginal() { + TestPayloadStore store = new TestPayloadStore(); + PayloadHelper helper = createHelper(store); + + HistoryEvent event = buildExecutionStartedEvent("regular-input"); + OrchestratorRequest request = OrchestratorRequest.newBuilder() + .setInstanceId("test-1") + .addNewEvents(event) + .build(); + + OrchestratorRequest result = PayloadInterceptionHelper.resolveOrchestratorRequestPayloads(request, helper); + assertSame(request, result, "Should return same object when no tokens found"); + } + + @Test + void resolveOrchestratorRequest_withToken_resolvesInput() { + TestPayloadStore store = new TestPayloadStore(); + String token = TOKEN_PREFIX + "abc"; + String originalPayload = "large-payload-data"; + store.seed(token, originalPayload); + PayloadHelper helper = createHelper(store); + + HistoryEvent event = buildExecutionStartedEvent(token); + OrchestratorRequest request = OrchestratorRequest.newBuilder() + .setInstanceId("test-2") + .addNewEvents(event) + .build(); + + OrchestratorRequest result = PayloadInterceptionHelper.resolveOrchestratorRequestPayloads(request, helper); + assertNotSame(request, result); + assertEquals(originalPayload, result.getNewEvents(0).getExecutionStarted().getInput().getValue()); + } + + @Test + void resolveOrchestratorRequest_taskCompleted_resolvesResult() { + TestPayloadStore store = new TestPayloadStore(); + String token = TOKEN_PREFIX + "task-result"; + String originalPayload = "{\"key\":\"value\"}"; + store.seed(token, originalPayload); + PayloadHelper helper = createHelper(store); + + HistoryEvent event = HistoryEvent.newBuilder() + .setEventId(1) + .setTaskCompleted(TaskCompletedEvent.newBuilder() + .setTaskScheduledId(1) + .setResult(StringValue.of(token)) + .build()) + .build(); + + OrchestratorRequest request = OrchestratorRequest.newBuilder() + .setInstanceId("test-3") + .addPastEvents(event) + .build(); + + OrchestratorRequest result = PayloadInterceptionHelper.resolveOrchestratorRequestPayloads(request, helper); + assertNotSame(request, result); + assertEquals(originalPayload, result.getPastEvents(0).getTaskCompleted().getResult().getValue()); + } + + @Test + void resolveOrchestratorRequest_eventRaised_resolvesInput() { + TestPayloadStore store = new TestPayloadStore(); + String token = TOKEN_PREFIX + "event-data"; + String originalPayload = "event payload content"; + store.seed(token, originalPayload); + PayloadHelper helper = createHelper(store); + + HistoryEvent event = HistoryEvent.newBuilder() + .setEventId(2) + .setEventRaised(EventRaisedEvent.newBuilder() + .setName("TestEvent") + .setInput(StringValue.of(token)) + .build()) + .build(); + + OrchestratorRequest request = OrchestratorRequest.newBuilder() + .setInstanceId("test-4") + .addNewEvents(event) + .build(); + + OrchestratorRequest result = PayloadInterceptionHelper.resolveOrchestratorRequestPayloads(request, helper); + assertEquals(originalPayload, result.getNewEvents(0).getEventRaised().getInput().getValue()); + } + + @Test + void resolveActivityRequest_withToken_resolvesInput() { + TestPayloadStore store = new TestPayloadStore(); + String token = TOKEN_PREFIX + "activity-input"; + String originalPayload = "activity input data"; + store.seed(token, originalPayload); + PayloadHelper helper = createHelper(store); + + ActivityRequest request = ActivityRequest.newBuilder() + .setName("TestActivity") + .setInput(StringValue.of(token)) + .build(); + + ActivityRequest result = PayloadInterceptionHelper.resolveActivityRequestPayloads(request, helper); + assertNotSame(request, result); + assertEquals(originalPayload, result.getInput().getValue()); + } + + @Test + void resolveActivityRequest_noToken_returnsOriginal() { + TestPayloadStore store = new TestPayloadStore(); + PayloadHelper helper = createHelper(store); + + ActivityRequest request = ActivityRequest.newBuilder() + .setName("TestActivity") + .setInput(StringValue.of("regular data")) + .build(); + + ActivityRequest result = PayloadInterceptionHelper.resolveActivityRequestPayloads(request, helper); + assertSame(request, result, "Should return same object when no token found"); + } + + // ---- Externalize tests ---- + + @Test + void externalizeOrchestratorResponse_smallPayloads_returnsOriginal() { + TestPayloadStore store = new TestPayloadStore(); + PayloadHelper helper = createHelper(store); + + OrchestratorResponse response = OrchestratorResponse.newBuilder() + .setInstanceId("test-5") + .setCustomStatus(StringValue.of("ok")) + .addActions(OrchestratorAction.newBuilder() + .setId(1) + .setScheduleTask(ScheduleTaskAction.newBuilder() + .setName("SmallTask") + .setInput(StringValue.of("tiny")) + .build()) + .build()) + .build(); + + OrchestratorResponse result = PayloadInterceptionHelper.externalizeOrchestratorResponsePayloads(response, helper); + assertSame(response, result, "Small payloads should not be externalized"); + } + + @Test + void externalizeOrchestratorResponse_largeScheduleTaskInput_externalizes() { + TestPayloadStore store = new TestPayloadStore(); + PayloadHelper helper = createHelper(store); + + String largeInput = "x".repeat(100); // > 10 byte threshold + OrchestratorResponse response = OrchestratorResponse.newBuilder() + .setInstanceId("test-6") + .addActions(OrchestratorAction.newBuilder() + .setId(1) + .setScheduleTask(ScheduleTaskAction.newBuilder() + .setName("LargeTask") + .setInput(StringValue.of(largeInput)) + .build()) + .build()) + .build(); + + OrchestratorResponse result = PayloadInterceptionHelper.externalizeOrchestratorResponsePayloads(response, helper); + assertNotSame(response, result); + + String externalizedInput = result.getActions(0).getScheduleTask().getInput().getValue(); + assertTrue(store.isKnownPayloadToken(externalizedInput), "Input should be externalized to a token"); + } + + @Test + void externalizeOrchestratorResponse_largeCompleteResult_externalizes() { + TestPayloadStore store = new TestPayloadStore(); + PayloadHelper helper = createHelper(store); + + String largeResult = "y".repeat(200); + OrchestratorResponse response = OrchestratorResponse.newBuilder() + .setInstanceId("test-7") + .addActions(OrchestratorAction.newBuilder() + .setId(1) + .setCompleteOrchestration(CompleteOrchestrationAction.newBuilder() + .setOrchestrationStatus(OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED) + .setResult(StringValue.of(largeResult)) + .build()) + .build()) + .build(); + + OrchestratorResponse result = PayloadInterceptionHelper.externalizeOrchestratorResponsePayloads(response, helper); + assertNotSame(response, result); + + String externalizedResult = result.getActions(0).getCompleteOrchestration().getResult().getValue(); + assertTrue(store.isKnownPayloadToken(externalizedResult)); + } + + @Test + void externalizeOrchestratorResponse_largeCustomStatus_externalizes() { + TestPayloadStore store = new TestPayloadStore(); + PayloadHelper helper = createHelper(store); + + String largeStatus = "s".repeat(100); + OrchestratorResponse response = OrchestratorResponse.newBuilder() + .setInstanceId("test-8") + .setCustomStatus(StringValue.of(largeStatus)) + .build(); + + OrchestratorResponse result = PayloadInterceptionHelper.externalizeOrchestratorResponsePayloads(response, helper); + assertNotSame(response, result); + + String externalizedStatus = result.getCustomStatus().getValue(); + assertTrue(store.isKnownPayloadToken(externalizedStatus)); + } + + @Test + void roundTrip_externalizeAndResolve_scheduleTaskInput() { + TestPayloadStore store = new TestPayloadStore(); + PayloadHelper helper = createHelper(store); + + String originalInput = "large-task-input-data-that-exceeds-threshold"; + OrchestratorResponse response = OrchestratorResponse.newBuilder() + .setInstanceId("test-9") + .addActions(OrchestratorAction.newBuilder() + .setId(1) + .setScheduleTask(ScheduleTaskAction.newBuilder() + .setName("Task") + .setInput(StringValue.of(originalInput)) + .build()) + .build()) + .build(); + + // Externalize + OrchestratorResponse externalized = PayloadInterceptionHelper.externalizeOrchestratorResponsePayloads(response, helper); + String token = externalized.getActions(0).getScheduleTask().getInput().getValue(); + assertTrue(store.isKnownPayloadToken(token)); + + // Simulate the payload arriving as a TaskScheduled history event + HistoryEvent taskScheduled = HistoryEvent.newBuilder() + .setEventId(1) + .setTaskScheduled(TaskScheduledEvent.newBuilder() + .setName("Task") + .setInput(StringValue.of(token)) + .build()) + .build(); + + OrchestratorRequest request = OrchestratorRequest.newBuilder() + .setInstanceId("test-9") + .addPastEvents(taskScheduled) + .build(); + + // Resolve + OrchestratorRequest resolved = PayloadInterceptionHelper.resolveOrchestratorRequestPayloads(request, helper); + assertEquals(originalInput, resolved.getPastEvents(0).getTaskScheduled().getInput().getValue()); + } + + // ---- Helper methods ---- + + private static HistoryEvent buildExecutionStartedEvent(String input) { + return HistoryEvent.newBuilder() + .setEventId(-1) + .setExecutionStarted(ExecutionStartedEvent.newBuilder() + .setName("TestOrchestration") + .setVersion(StringValue.of("")) + .setInput(StringValue.of(input)) + .setOrchestrationInstance(OrchestrationInstance.newBuilder() + .setInstanceId("test-instance") + .build()) + .build()) + .build(); + } +} diff --git a/settings.gradle b/settings.gradle index 3813e3ae..afe0838e 100644 --- a/settings.gradle +++ b/settings.gradle @@ -3,6 +3,7 @@ rootProject.name = 'durabletask-java' include ":client" include ":azurefunctions" include ":azuremanaged" +include ":azure-blob-payloads" include ":samples" include ":samples-azure-functions" include ":endtoendtests"