diff --git a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs index 9ad814c43..40b2ec015 100644 --- a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs +++ b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs @@ -1,4 +1,4 @@ -// ---------------------------------------------------------------------------------- +// ---------------------------------------------------------------------------------- // Copyright Microsoft Corporation // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -1886,9 +1886,9 @@ public async Task> GetOrchestrationStateAsync(string i /// /// Gets the state of all orchestration instances that match the specified parameters. /// - /// CreatedTime of orchestrations. Fetch status grater than this value. + /// CreatedTime of orchestrations. Fetch status greater than this value. /// CreatedTime of orchestrations. Fetch status less than this value. - /// RuntimeStatus of orchestrations. You can specify several status. + /// RuntimeStatus of orchestrations. You can specify several statuses. /// Cancellation Token /// List of public async Task> GetOrchestrationStateAsync(DateTime createdTimeFrom, DateTime? createdTimeTo, IEnumerable runtimeStatus, CancellationToken cancellationToken = default(CancellationToken)) @@ -1900,9 +1900,9 @@ public async Task> GetOrchestrationStateAsync(string i /// /// Gets the state of all orchestration instances that match the specified parameters. /// - /// CreatedTime of orchestrations. Fetch status grater than this value. + /// CreatedTime of orchestrations. Fetch status greater than this value. /// CreatedTime of orchestrations. Fetch status less than this value. - /// RuntimeStatus of orchestrations. You can specify several status. + /// RuntimeStatus of orchestrations. You can specify several statuses. /// Top is number of records per one request. /// ContinuationToken of the pager. /// Cancellation Token @@ -2021,15 +2021,28 @@ public Task PurgeInstanceHistoryAsync(string instanceId) /// /// Purge history for orchestrations that match the specified parameters. /// - /// CreatedTime of orchestrations. Purges history grater than this value. + /// CreatedTime of orchestrations. Purges history greater than this value. /// CreatedTime of orchestrations. Purges history less than this value. - /// RuntimeStatus of orchestrations. You can specify several status. + /// RuntimeStatus of orchestrations. You can specify several statuses. /// Class containing number of storage requests sent, along with instances and rows deleted/purged public Task PurgeInstanceHistoryAsync(DateTime createdTimeFrom, DateTime? createdTimeTo, IEnumerable runtimeStatus) { return this.trackingStore.PurgeInstanceHistoryAsync(createdTimeFrom, createdTimeTo, runtimeStatus); } + /// + /// Purge history for orchestrations that match the specified parameters, with an optional timeout. + /// + /// CreatedTime of orchestrations. Purges history greater than this value. + /// CreatedTime of orchestrations. Purges history less than this value. + /// RuntimeStatus of orchestrations. You can specify several statuses. + /// Optional timeout. When specified, the purge stops after this duration and returns partial results. + /// Class containing number of storage requests sent, along with instances and rows deleted/purged + internal Task PurgeInstanceHistoryAsync(DateTime createdTimeFrom, DateTime? createdTimeTo, IEnumerable runtimeStatus, TimeSpan? timeout) + { + return this.trackingStore.PurgeInstanceHistoryAsync(createdTimeFrom, createdTimeTo, runtimeStatus, timeout); + } + /// async Task IOrchestrationServicePurgeClient.PurgeInstanceStateAsync(string instanceId) { @@ -2040,10 +2053,11 @@ async Task IOrchestrationServicePurgeClient.PurgeInstanceStateAsync /// async Task IOrchestrationServicePurgeClient.PurgeInstanceStateAsync(PurgeInstanceFilter purgeInstanceFilter) { - PurgeHistoryResult storagePurgeHistoryResult = await this.PurgeInstanceHistoryAsync( - purgeInstanceFilter.CreatedTimeFrom, - purgeInstanceFilter.CreatedTimeTo, - purgeInstanceFilter.RuntimeStatus); + PurgeHistoryResult storagePurgeHistoryResult = await this.trackingStore.PurgeInstanceHistoryAsync( + purgeInstanceFilter.CreatedTimeFrom, + purgeInstanceFilter.CreatedTimeTo, + purgeInstanceFilter.RuntimeStatus, + purgeInstanceFilter.Timeout); return storagePurgeHistoryResult.ToCorePurgeHistoryResult(); } #nullable enable diff --git a/src/DurableTask.AzureStorage/MessageManager.cs b/src/DurableTask.AzureStorage/MessageManager.cs index ff48d507f..48f46ffac 100644 --- a/src/DurableTask.AzureStorage/MessageManager.cs +++ b/src/DurableTask.AzureStorage/MessageManager.cs @@ -318,8 +318,9 @@ public string GetNewLargeMessageBlobName(MessageData message) public async Task DeleteLargeMessageBlobs(string sanitizedInstanceId, CancellationToken cancellationToken = default) { - int storageOperationCount = 1; - if (await this.blobContainer.ExistsAsync(cancellationToken)) + int storageOperationCount = 0; + + try { await foreach (Page page in this.blobContainer.ListBlobsAsync(sanitizedInstanceId, cancellationToken).AsPages()) { @@ -329,6 +330,22 @@ public async Task DeleteLargeMessageBlobs(string sanitizedInstanceId, Cance storageOperationCount += page.Values.Count; } + + // Count the list operation even if no blobs found (the initial list request still happened) + if (storageOperationCount == 0) + { + storageOperationCount = 1; + } + } + catch (DurableTaskStorageException ex) when (ex.HttpStatusCode == 404) + { + // Container does not exist; nothing to delete. + storageOperationCount = 1; + } + catch (Azure.RequestFailedException ex) when (ex.Status == 404) + { + // Container does not exist; nothing to delete. + storageOperationCount = 1; } return storageOperationCount; diff --git a/src/DurableTask.AzureStorage/PurgeHistoryResult.cs b/src/DurableTask.AzureStorage/PurgeHistoryResult.cs index d42ce9d45..3ed7b3e18 100644 --- a/src/DurableTask.AzureStorage/PurgeHistoryResult.cs +++ b/src/DurableTask.AzureStorage/PurgeHistoryResult.cs @@ -33,6 +33,19 @@ public PurgeHistoryResult(int storageRequests, int instancesDeleted, int rowsDel this.RowsDeleted = rowsDeleted; } + /// + /// Constructor for purge history statistics with completion status. + /// + /// Requests sent to storage + /// Number of instances deleted + /// Number of rows deleted + /// Whether the purge operation completed all matching instances. + public PurgeHistoryResult(int storageRequests, int instancesDeleted, int rowsDeleted, bool? isComplete) + : this(storageRequests, instancesDeleted, rowsDeleted) + { + this.IsComplete = isComplete; + } + /// /// Number of requests sent to Storage during this execution of purge history /// @@ -48,12 +61,20 @@ public PurgeHistoryResult(int storageRequests, int instancesDeleted, int rowsDel /// public int RowsDeleted { get; } + /// + /// Gets a value indicating whether the purge operation is complete. + /// true if all matching instances were purged; + /// false if more instances remain and purge should be called again; + /// null if completion status is unknown. + /// + public bool? IsComplete { get; } + /// /// Converts from AzureStorage.PurgeHistoryResult to Core.PurgeResult type /// public PurgeResult ToCorePurgeHistoryResult() { - return new PurgeResult(this.InstancesDeleted); + return new PurgeResult(this.InstancesDeleted, this.IsComplete); } } } diff --git a/src/DurableTask.AzureStorage/Storage/Table.cs b/src/DurableTask.AzureStorage/Storage/Table.cs index 7d726e1d7..40511bb29 100644 --- a/src/DurableTask.AzureStorage/Storage/Table.cs +++ b/src/DurableTask.AzureStorage/Storage/Table.cs @@ -117,6 +117,117 @@ public async Task DeleteBatchAsync(IEnumerable en return await this.ExecuteBatchAsync(entityBatch, item => new TableTransactionAction(TableTransactionActionType.Delete, item), cancellationToken: cancellationToken); } + /// + /// Deletes entities in parallel batches of up to 100. Each batch is an atomic transaction, + /// but multiple batches are submitted concurrently for improved throughput. + /// Concurrency is controlled by the global . + /// If a batch fails because an entity was already deleted (404/EntityNotFound), + /// it falls back to individual deletes for that batch, skipping already-deleted entities. + /// + public async Task DeleteBatchParallelAsync( + IReadOnlyList entityBatch, + CancellationToken cancellationToken = default) where T : ITableEntity + { + if (entityBatch.Count == 0) + { + return new TableTransactionResults(Array.Empty(), TimeSpan.Zero, 0); + } + + const int batchSize = 100; + int chunkCount = (entityBatch.Count + batchSize - 1) / batchSize; + var chunks = new List>(chunkCount); + + var currentChunk = new List(batchSize); + foreach (T entity in entityBatch) + { + currentChunk.Add(new TableTransactionAction(TableTransactionActionType.Delete, entity)); + if (currentChunk.Count == batchSize) + { + chunks.Add(currentChunk); + currentChunk = new List(batchSize); + } + } + + if (currentChunk.Count > 0) + { + chunks.Add(currentChunk); + } + + var resultsBuilder = new TableTransactionResultsBuilder(); + + var stopwatch = Stopwatch.StartNew(); + TableTransactionResults[] allResults = await Task.WhenAll( + chunks.Select(chunk => this.ExecuteBatchWithFallbackAsync(chunk, cancellationToken))); + stopwatch.Stop(); + + foreach (TableTransactionResults result in allResults) + { + resultsBuilder.Add(result); + } + + TableTransactionResults aggregatedResults = resultsBuilder.ToResults(); + return new TableTransactionResults(aggregatedResults.Responses, stopwatch.Elapsed, aggregatedResults.RequestCount); + } + + /// + /// Executes a batch transaction. If it fails due to an entity not found (404), + /// falls back to individual delete operations, skipping entities that are already gone. + /// + async Task ExecuteBatchWithFallbackAsync( + List batch, + CancellationToken cancellationToken) + { + try + { + return await this.ExecuteBatchAsync(batch, cancellationToken); + } + catch (DurableTaskStorageException ex) when (ex.HttpStatusCode == 404) + { + // One or more entities in the batch were already deleted. + // Fall back to individual deletes, skipping 404s. + return await this.DeleteEntitiesIndividuallyAsync(batch, cancellationToken); + } + catch (RequestFailedException ex) when (ex.Status == 404) + { + return await this.DeleteEntitiesIndividuallyAsync(batch, cancellationToken); + } + } + + async Task DeleteEntitiesIndividuallyAsync( + List batch, + CancellationToken cancellationToken) + { + var responses = new List(); + var stopwatch = Stopwatch.StartNew(); + int requestCount = 0; + + foreach (TableTransactionAction action in batch) + { + requestCount++; + try + { + Response response = await this.tableClient.DeleteEntityAsync( + action.Entity.PartitionKey, + action.Entity.RowKey, + ETag.All, + cancellationToken).DecorateFailure(); + responses.Add(response); + this.stats.TableEntitiesWritten.Increment(); + } + catch (DurableTaskStorageException ex) when (ex.HttpStatusCode == 404) + { + // Entity already deleted; skip. + } + catch (RequestFailedException ex) when (ex.Status == 404) + { + // Entity already deleted; skip. + } + } + + stopwatch.Stop(); + return new TableTransactionResults(responses, stopwatch.Elapsed, requestCount); + } + public async Task InsertOrMergeBatchAsync(IEnumerable entityBatch, CancellationToken cancellationToken = default) where T : ITableEntity { TableTransactionResults results = await this.ExecuteBatchAsync(entityBatch, item => new TableTransactionAction(TableTransactionActionType.UpsertMerge, item), cancellationToken: cancellationToken); diff --git a/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs b/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs index f1e67a0bf..f431f4939 100644 --- a/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs +++ b/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs @@ -1,4 +1,4 @@ -// ---------------------------------------------------------------------------------- +// ---------------------------------------------------------------------------------- // Copyright Microsoft Corporation // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -557,6 +557,7 @@ async Task DeleteHistoryAsync( DateTime createdTimeFrom, DateTime? createdTimeTo, IEnumerable runtimeStatus, + TimeSpan? timeout, CancellationToken cancellationToken) { var condition = OrchestrationInstanceStatusQueryCondition.Parse( @@ -568,26 +569,83 @@ async Task DeleteHistoryAsync( ODataCondition odata = condition.ToOData(); - // Limit to batches of 100 to avoid excessive memory usage and table storage scanning int storageRequests = 0; int instancesDeleted = 0; int rowsDeleted = 0; - var options = new ParallelOptions { MaxDegreeOfParallelism = this.settings.MaxStorageOperationConcurrency }; - AsyncPageable entitiesPageable = this.InstancesTable.ExecuteQueryAsync(odata.Filter, select: odata.Select, cancellationToken: cancellationToken); - await foreach (Page page in entitiesPageable.AsPages(pageSizeHint: 100)) + // Create a timeout CTS if a timeout was specified. When no timeout is specified, + // the purge runs unbounded (original behavior). + using CancellationTokenSource timeoutCts = timeout.HasValue + ? new CancellationTokenSource(timeout.Value) + : null; + using CancellationTokenSource linkedCts = timeout.HasValue + ? CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, timeoutCts.Token) + : null; + CancellationToken effectiveToken = linkedCts?.Token ?? cancellationToken; + + // Limit concurrent instance purges to avoid overwhelming storage with too many parallel operations. + // Each instance purge internally spawns multiple parallel storage operations, so this should be + // kept moderate. Using 100 to match the original implicit concurrency from pageSizeHint. + const int MaxPurgeInstanceConcurrency = 100; + using var throttle = new SemaphoreSlim(MaxPurgeInstanceConcurrency); + var pendingTasks = new List(); + + bool timedOut = false; + + try { - // The underlying client throttles - await Task.WhenAll(page.Values.Select(async instance => + AsyncPageable entitiesPageable = this.InstancesTable.ExecuteQueryAsync(odata.Filter, select: odata.Select, cancellationToken: effectiveToken); + await foreach (Page page in entitiesPageable.AsPages(pageSizeHint: MaxPurgeInstanceConcurrency)) { - PurgeHistoryResult statisticsFromDeletion = await this.DeleteAllDataForOrchestrationInstance(instance, cancellationToken); - Interlocked.Add(ref instancesDeleted, statisticsFromDeletion.InstancesDeleted); - Interlocked.Add(ref storageRequests, statisticsFromDeletion.RowsDeleted); - Interlocked.Add(ref rowsDeleted, statisticsFromDeletion.RowsDeleted); - })); + foreach (OrchestrationInstanceStatus instance in page.Values) + { + effectiveToken.ThrowIfCancellationRequested(); + + await throttle.WaitAsync(effectiveToken); + + async Task DeleteInstanceAsync(OrchestrationInstanceStatus inst) + { + try + { + PurgeHistoryResult statisticsFromDeletion = await this.DeleteAllDataForOrchestrationInstance(inst, effectiveToken); + Interlocked.Add(ref instancesDeleted, statisticsFromDeletion.InstancesDeleted); + Interlocked.Add(ref storageRequests, statisticsFromDeletion.StorageRequests); + Interlocked.Add(ref rowsDeleted, statisticsFromDeletion.RowsDeleted); + } + finally + { + throttle.Release(); + } + } + + pendingTasks.Add(DeleteInstanceAsync(instance)); + } + } + } + catch (OperationCanceledException) when (timeoutCts != null && timeoutCts.IsCancellationRequested) + { + // Timeout reached — stop accepting new instances. + timedOut = true; + } + + // Wait for all remaining dispatched deletions to finish or be cancelled by the timeout. + try + { + await Task.WhenAll(pendingTasks); } + catch (OperationCanceledException) when (timeoutCts != null && timeoutCts.IsCancellationRequested) + { + // In-flight deletes were cancelled by the timeout — expected. + timedOut = true; + } + + // Determine completion status: + // - If a timeout was specified and fired, more instances may remain (isComplete = false). + // - If a timeout was specified and didn't fire, all instances were purged (isComplete = true). + // - If no timeout was specified, we purge everything (isComplete = null for backward compat). + bool? isComplete = timeout.HasValue ? !timedOut : (bool?)null; - return new PurgeHistoryResult(storageRequests, instancesDeleted, rowsDeleted); + return new PurgeHistoryResult(storageRequests, instancesDeleted, rowsDeleted, isComplete); } async Task DeleteAllDataForOrchestrationInstance(OrchestrationInstanceStatus orchestrationInstanceStatus, CancellationToken cancellationToken) @@ -618,7 +676,7 @@ async Task DeleteAllDataForOrchestrationInstance(Orchestrati }), Task.Run(async () => { - var deletedEntitiesResponseInfo = await this.HistoryTable.DeleteBatchAsync(historyEntities, cancellationToken); + var deletedEntitiesResponseInfo = await this.HistoryTable.DeleteBatchParallelAsync(historyEntities, cancellationToken); Interlocked.Add(ref rowsDeleted, deletedEntitiesResponseInfo.Responses.Count); Interlocked.Add(ref storageRequests, deletedEntitiesResponseInfo.RequestCount); }), @@ -677,6 +735,7 @@ public override async Task PurgeInstanceHistoryAsync( DateTime createdTimeFrom, DateTime? createdTimeTo, IEnumerable runtimeStatus, + TimeSpan? timeout = null, CancellationToken cancellationToken = default) { Stopwatch stopwatch = Stopwatch.StartNew(); @@ -686,7 +745,7 @@ public override async Task PurgeInstanceHistoryAsync( status == OrchestrationStatus.Canceled || status == OrchestrationStatus.Failed).ToList(); - PurgeHistoryResult result = await this.DeleteHistoryAsync(createdTimeFrom, createdTimeTo, runtimeStatusList, cancellationToken); + PurgeHistoryResult result = await this.DeleteHistoryAsync(createdTimeFrom, createdTimeTo, runtimeStatusList, timeout, cancellationToken); this.settings.Logger.PurgeInstanceHistory( this.storageAccountName, diff --git a/src/DurableTask.AzureStorage/Tracking/ITrackingStore.cs b/src/DurableTask.AzureStorage/Tracking/ITrackingStore.cs index a1fc52e9f..a53729233 100644 --- a/src/DurableTask.AzureStorage/Tracking/ITrackingStore.cs +++ b/src/DurableTask.AzureStorage/Tracking/ITrackingStore.cs @@ -193,9 +193,10 @@ interface ITrackingStore /// /// Start creation time for querying instances for purging /// End creation time for querying instances for purging - /// List of runtime status for querying instances for purging. Only Completed, Terminated, or Failed will be processed + /// List of runtime status for querying instances for purging. Only Completed, Terminated, Canceled, or Failed will be processed + /// Maximum time to spend purging. If null, all matching instances are purged with no time limit. /// The token to monitor for cancellation requests. The default value is . /// Class containing number of storage requests sent, along with instances and rows deleted/purged - Task PurgeInstanceHistoryAsync(DateTime createdTimeFrom, DateTime? createdTimeTo, IEnumerable runtimeStatus, CancellationToken cancellationToken = default); + Task PurgeInstanceHistoryAsync(DateTime createdTimeFrom, DateTime? createdTimeTo, IEnumerable runtimeStatus, TimeSpan? timeout = null, CancellationToken cancellationToken = default); } } diff --git a/src/DurableTask.AzureStorage/Tracking/TrackingStoreBase.cs b/src/DurableTask.AzureStorage/Tracking/TrackingStoreBase.cs index d02a729c0..6eb14aeed 100644 --- a/src/DurableTask.AzureStorage/Tracking/TrackingStoreBase.cs +++ b/src/DurableTask.AzureStorage/Tracking/TrackingStoreBase.cs @@ -86,7 +86,7 @@ public virtual Task PurgeInstanceHistoryAsync(string instanc } /// - public virtual Task PurgeInstanceHistoryAsync(DateTime createdTimeFrom, DateTime? createdTimeTo, IEnumerable runtimeStatus, CancellationToken cancellationToken = default) + public virtual Task PurgeInstanceHistoryAsync(DateTime createdTimeFrom, DateTime? createdTimeTo, IEnumerable runtimeStatus, TimeSpan? timeout = null, CancellationToken cancellationToken = default) { throw new NotSupportedException(); } diff --git a/src/DurableTask.Core/PurgeInstanceFilter.cs b/src/DurableTask.Core/PurgeInstanceFilter.cs index 30f68a23f..0468d4ecd 100644 --- a/src/DurableTask.Core/PurgeInstanceFilter.cs +++ b/src/DurableTask.Core/PurgeInstanceFilter.cs @@ -1,4 +1,4 @@ -// ---------------------------------------------------------------------------------- +// ---------------------------------------------------------------------------------- // Copyright Microsoft Corporation // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -48,5 +48,13 @@ public PurgeInstanceFilter(DateTime createdTimeFrom, DateTime? createdTimeTo, IE /// The runtime status of the orchestrations to purge. /// public IEnumerable? RuntimeStatus { get; } + + /// + /// The maximum amount of time to spend purging instances in a single call. + /// If null (default), all matching instances are purged with no time limit. + /// When set, the purge stops dispatching new instance deletions after this duration elapses + /// and cancels any in-flight deletions, then returns with set to false. + /// + public TimeSpan? Timeout { get; set; } } } \ No newline at end of file diff --git a/test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs b/test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs index d7a8fccaa..a43c683b3 100644 --- a/test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs +++ b/test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs @@ -1,4 +1,4 @@ -// ---------------------------------------------------------------------------------- +// ---------------------------------------------------------------------------------- // Copyright Microsoft Corporation // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -448,7 +448,7 @@ public async Task PurgeInstanceHistoryForTimePeriodDeleteAll() using (TestOrchestrationHost host = TestHelpers.GetTestOrchestrationHost(enableExtendedSessions: false)) { await host.StartAsync(); - DateTime startDateTime = DateTime.Now; + DateTime startDateTime = DateTime.UtcNow; string firstInstanceId = "instance1"; TestOrchestrationClient client = await host.StartOrchestrationAsync(typeof(Orchestrations.FanOutFanIn), 50, firstInstanceId); await client.WaitForCompletionAsync(TimeSpan.FromSeconds(30)); @@ -561,6 +561,205 @@ private async Task GetBlobCount(string containerName, string directoryName) } + [TestMethod] + public async Task PurgeMultipleInstancesHistoryByTimePeriod_ScalabilityValidation() + { + // This test validates scale improvements: parallel batch delete and pipelined page processing. + // Runs multiple concurrent orchestrations, then purges all of them by time period. + using (TestOrchestrationHost host = TestHelpers.GetTestOrchestrationHost(enableExtendedSessions: false)) + { + await host.StartAsync(); + DateTime startDateTime = DateTime.UtcNow; + + // Create multiple orchestration instances concurrently + const int instanceCount = 5; + var clients = new List(); + var instanceIds = new List(); + + for (int i = 0; i < instanceCount; i++) + { + string instanceId = $"purge-scale-{Guid.NewGuid():N}"; + instanceIds.Add(instanceId); + TestOrchestrationClient client = await host.StartOrchestrationAsync( + typeof(Orchestrations.Factorial), 10, instanceId); + clients.Add(client); + } + + // Wait for all orchestrations to complete + foreach (var client in clients) + { + var status = await client.WaitForCompletionAsync(TimeSpan.FromSeconds(60)); + Assert.AreEqual(OrchestrationStatus.Completed, status?.OrchestrationStatus); + } + + // Verify all instances have history + foreach (string instanceId in instanceIds) + { + List historyEvents = await clients[0].GetOrchestrationHistoryAsync(instanceId); + Assert.IsTrue(historyEvents.Count > 0, $"Instance {instanceId} should have history events"); + } + + // Purge all instances by time period + await clients[0].PurgeInstanceHistoryByTimePeriod( + startDateTime, + DateTime.UtcNow, + new List { OrchestrationStatus.Completed }); + + // Verify all history is purged + foreach (string instanceId in instanceIds) + { + List historyEvents = await clients[0].GetOrchestrationHistoryAsync(instanceId); + Assert.AreEqual(0, historyEvents.Count, $"Instance {instanceId} should have no history after purge"); + + IList stateList = await clients[0].GetStateAsync(instanceId); + Assert.AreEqual(1, stateList.Count); + Assert.IsNull(stateList[0], $"Instance {instanceId} state should be null after purge"); + } + + await host.StopAsync(); + } + } + + [TestMethod] + public async Task PurgeSingleInstanceWithIdempotency() + { + // This test validates that purging the same instance twice doesn't cause errors + // (testing the idempotent batch delete fallback). + using (TestOrchestrationHost host = TestHelpers.GetTestOrchestrationHost(enableExtendedSessions: false)) + { + string instanceId = Guid.NewGuid().ToString(); + await host.StartAsync(); + TestOrchestrationClient client = await host.StartOrchestrationAsync( + typeof(Orchestrations.Factorial), 110, instanceId); + await client.WaitForCompletionAsync(TimeSpan.FromSeconds(60)); + + // First purge should succeed + await client.PurgeInstanceHistory(); + + List historyEvents = await client.GetOrchestrationHistoryAsync(instanceId); + Assert.AreEqual(0, historyEvents.Count); + + // Second purge of the same instance should not throw + // (the instance row is already gone, so PurgeInstanceHistoryAsync returns 0) + await client.PurgeInstanceHistory(); + + await host.StopAsync(); + } + } + + [TestMethod] + public async Task PurgeSingleInstance_WithLargeBlobs_CleansUpBlobs() + { + using (TestOrchestrationHost host = TestHelpers.GetTestOrchestrationHost(enableExtendedSessions: false)) + { + await host.StartAsync(); + + string instanceId = Guid.NewGuid().ToString(); + // Generate a payload large enough to be stored as a blob (>60KB threshold) + string largeMessage = new string('x', 70 * 1024); + + TestOrchestrationClient client = await host.StartOrchestrationAsync( + typeof(Orchestrations.Echo), largeMessage, instanceId); + OrchestrationState status = await client.WaitForCompletionAsync(TimeSpan.FromMinutes(2)); + Assert.AreEqual(OrchestrationStatus.Completed, status?.OrchestrationStatus); + + // Verify blobs exist before purge + int blobCount = await this.GetBlobCount("test-largemessages", instanceId); + Assert.IsTrue(blobCount > 0, "Should have large message blobs before purge"); + + // Purge + await client.PurgeInstanceHistory(); + + // Verify blobs are cleaned up + blobCount = await this.GetBlobCount("test-largemessages", instanceId); + Assert.AreEqual(0, blobCount, "All large message blobs should be deleted after purge"); + + // Verify history is gone + List historyEvents = await client.GetOrchestrationHistoryAsync(instanceId); + Assert.AreEqual(0, historyEvents.Count); + + await host.StopAsync(); + } + } + + [TestMethod] + public async Task PurgeInstance_WithManyHistoryRows_DeletesAll() + { + using (TestOrchestrationHost host = TestHelpers.GetTestOrchestrationHost(enableExtendedSessions: false)) + { + await host.StartAsync(); + + string instanceId = Guid.NewGuid().ToString(); + // FanOutFanIn with 50 parallel activities creates 100+ history rows + TestOrchestrationClient client = await host.StartOrchestrationAsync( + typeof(Orchestrations.FanOutFanIn), 50, instanceId); + OrchestrationState status = await client.WaitForCompletionAsync(TimeSpan.FromSeconds(60)); + Assert.AreEqual(OrchestrationStatus.Completed, status?.OrchestrationStatus); + + // Verify lots of history exists + List historyEvents = await client.GetOrchestrationHistoryAsync(instanceId); + Assert.IsTrue(historyEvents.Count > 50, $"Expected many history events, got {historyEvents.Count}"); + + // Purge + await client.PurgeInstanceHistory(); + + // Verify clean + historyEvents = await client.GetOrchestrationHistoryAsync(instanceId); + Assert.AreEqual(0, historyEvents.Count); + + IList stateList = await client.GetStateAsync(instanceId); + Assert.AreEqual(1, stateList.Count); + Assert.IsNull(stateList[0]); + + await host.StopAsync(); + } + } + + [TestMethod] + public async Task PurgeAllInstances_HistoryIsCleared() + { + // Validates that purging multiple instances clears all history + // when purging by time period. + using (TestOrchestrationHost host = TestHelpers.GetTestOrchestrationHost(enableExtendedSessions: false)) + { + await host.StartAsync(); + DateTime startDateTime = DateTime.UtcNow; + + const int totalInstances = 5; + var clients = new List(); + for (int i = 0; i < totalInstances; i++) + { + string instanceId = $"purge-complete-{Guid.NewGuid():N}"; + TestOrchestrationClient client = await host.StartOrchestrationAsync( + typeof(Orchestrations.Factorial), 10, instanceId); + clients.Add(client); + } + + foreach (var client in clients) + { + var status = await client.WaitForCompletionAsync(TimeSpan.FromSeconds(60)); + Assert.AreEqual(OrchestrationStatus.Completed, status?.OrchestrationStatus); + } + + DateTime endDateTime = DateTime.UtcNow; + var statuses = new List { OrchestrationStatus.Completed }; + + // Purge should complete within the 30s built-in timeout for a small number of instances + await clients[0].PurgeInstanceHistoryByTimePeriod( + startDateTime, endDateTime, statuses); + + // Verify all history is purged + foreach (var client in clients) + { + List historyEvents = await client.GetOrchestrationHistoryAsync( + client.InstanceId); + Assert.AreEqual(0, historyEvents.Count, "History should be purged"); + } + + await host.StopAsync(); + } + } + [TestMethod] public async Task PurgeInstanceHistoryForTimePeriodDeletePartially() { @@ -568,11 +767,11 @@ public async Task PurgeInstanceHistoryForTimePeriodDeletePartially() { // Execute the orchestrator twice. Orchestrator will be replied. However instances might be two. await host.StartAsync(); - DateTime startDateTime = DateTime.Now; + DateTime startDateTime = DateTime.UtcNow; string firstInstanceId = Guid.NewGuid().ToString(); TestOrchestrationClient client = await host.StartOrchestrationAsync(typeof(Orchestrations.FanOutFanIn), 50, firstInstanceId); await client.WaitForCompletionAsync(TimeSpan.FromSeconds(30)); - DateTime endDateTime = DateTime.Now; + DateTime endDateTime = DateTime.UtcNow; await Task.Delay(5000); string secondInstanceId = Guid.NewGuid().ToString(); client = await host.StartOrchestrationAsync(typeof(Orchestrations.FanOutFanIn), 50, secondInstanceId); @@ -636,6 +835,97 @@ public async Task PurgeInstanceHistoryForTimePeriodDeletePartially() } } + [TestMethod] + public async Task PurgeInstanceHistoryWithTimeoutCompletesAll() + { + using (TestOrchestrationHost host = TestHelpers.GetTestOrchestrationHost(enableExtendedSessions: false)) + { + await host.StartAsync(); + DateTime startDateTime = DateTime.UtcNow; + + // Start 2 simple orchestrations that complete quickly + TestOrchestrationClient client = await host.StartOrchestrationAsync(typeof(Orchestrations.SayHelloInline), "World", Guid.NewGuid().ToString()); + await client.WaitForCompletionAsync(TimeSpan.FromSeconds(60)); + client = await host.StartOrchestrationAsync(typeof(Orchestrations.SayHelloInline), "World", Guid.NewGuid().ToString()); + await client.WaitForCompletionAsync(TimeSpan.FromSeconds(60)); + + IList results = await host.GetAllOrchestrationInstancesAsync(); + Assert.AreEqual(2, results.Count); + + // Purge with a generous timeout — should complete all + PurgeHistoryResult purgeResult = await client.PurgeInstanceHistoryByTimePeriodWithTimeout( + startDateTime, + DateTime.UtcNow, + new List { OrchestrationStatus.Completed }, + timeout: TimeSpan.FromMinutes(5)); + + Assert.AreEqual(2, purgeResult.InstancesDeleted); + Assert.IsTrue(purgeResult.IsComplete.HasValue, "IsComplete should have a value when timeout is specified"); + Assert.IsTrue(purgeResult.IsComplete.Value, "IsComplete should be true when all instances were purged"); + + results = await host.GetAllOrchestrationInstancesAsync(); + Assert.AreEqual(0, results.Count); + + await host.StopAsync(); + } + } + + [TestMethod] + public async Task PurgeInstanceHistoryWithTimeoutExpiresReturnsIsCompleteFalse() + { + using (TestOrchestrationHost host = TestHelpers.GetTestOrchestrationHost(enableExtendedSessions: false)) + { + await host.StartAsync(); + DateTime startDateTime = DateTime.UtcNow; + + // Start several orchestrations so that purge cannot finish in 1 ms + const int instanceCount = 10; + TestOrchestrationClient client = null; + for (int i = 0; i < instanceCount; i++) + { + client = await host.StartOrchestrationAsync(typeof(Orchestrations.SayHelloInline), "World", Guid.NewGuid().ToString()); + await client.WaitForCompletionAsync(TimeSpan.FromSeconds(60)); + } + + // Purge with a tiny timeout — should expire before all instances are deleted + PurgeHistoryResult purgeResult = await client.PurgeInstanceHistoryByTimePeriodWithTimeout( + startDateTime, + DateTime.UtcNow, + new List { OrchestrationStatus.Completed }, + timeout: TimeSpan.FromMilliseconds(1)); + + Assert.IsTrue(purgeResult.IsComplete.HasValue, "IsComplete should have a value when timeout is specified"); + Assert.IsFalse(purgeResult.IsComplete.Value, "IsComplete should be false when the timeout expired before all instances were purged"); + + await host.StopAsync(); + } + } + + [TestMethod] + public async Task PurgeInstanceHistoryWithoutTimeoutReturnsNullIsComplete() + { + using (TestOrchestrationHost host = TestHelpers.GetTestOrchestrationHost(enableExtendedSessions: false)) + { + await host.StartAsync(); + DateTime startDateTime = DateTime.UtcNow; + + TestOrchestrationClient client = await host.StartOrchestrationAsync(typeof(Orchestrations.SayHelloInline), "World", Guid.NewGuid().ToString()); + await client.WaitForCompletionAsync(TimeSpan.FromSeconds(60)); + + // Purge without timeout (backward compat) + PurgeHistoryResult purgeResult = await client.PurgeInstanceHistoryByTimePeriodWithTimeout( + startDateTime, + DateTime.UtcNow, + new List { OrchestrationStatus.Completed }, + timeout: null); + + Assert.AreEqual(1, purgeResult.InstancesDeleted); + Assert.IsFalse(purgeResult.IsComplete.HasValue, "IsComplete should be null when no timeout is specified (backward compat)"); + + await host.StopAsync(); + } + } + /// /// End-to-end test which validates parallel function execution by enumerating all files in the current directory /// in parallel and getting the sum total of all file sizes. diff --git a/test/DurableTask.AzureStorage.Tests/PurgeHistoryResultTests.cs b/test/DurableTask.AzureStorage.Tests/PurgeHistoryResultTests.cs new file mode 100644 index 000000000..0a6f0e3c7 --- /dev/null +++ b/test/DurableTask.AzureStorage.Tests/PurgeHistoryResultTests.cs @@ -0,0 +1,129 @@ +// ---------------------------------------------------------------------------------- +// Copyright Microsoft Corporation +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ---------------------------------------------------------------------------------- + +namespace DurableTask.AzureStorage.Tests +{ + using System; + using DurableTask.Core; + using Microsoft.VisualStudio.TestTools.UnitTesting; + + [TestClass] + public class PurgeHistoryResultTests + { + [TestMethod] + public void Constructor_WithoutIsComplete_IsCompleteIsNull() + { + // Arrange & Act + var result = new PurgeHistoryResult(storageRequests: 5, instancesDeleted: 3, rowsDeleted: 10); + + // Assert + Assert.AreEqual(5, result.StorageRequests); + Assert.AreEqual(3, result.InstancesDeleted); + Assert.AreEqual(10, result.RowsDeleted); + Assert.IsNull(result.IsComplete); + } + + [TestMethod] + public void Constructor_WithIsCompleteTrue_SetsProperty() + { + // Arrange & Act + var result = new PurgeHistoryResult(storageRequests: 10, instancesDeleted: 5, rowsDeleted: 20, isComplete: true); + + // Assert + Assert.AreEqual(10, result.StorageRequests); + Assert.AreEqual(5, result.InstancesDeleted); + Assert.AreEqual(20, result.RowsDeleted); + Assert.IsTrue(result.IsComplete.HasValue); + Assert.IsTrue(result.IsComplete.Value); + } + + [TestMethod] + public void Constructor_WithIsCompleteFalse_SetsProperty() + { + // Arrange & Act + var result = new PurgeHistoryResult(storageRequests: 8, instancesDeleted: 2, rowsDeleted: 15, isComplete: false); + + // Assert + Assert.IsTrue(result.IsComplete.HasValue); + Assert.IsFalse(result.IsComplete.Value); + } + + [TestMethod] + public void Constructor_WithIsCompleteNull_SetsProperty() + { + // Arrange & Act + var result = new PurgeHistoryResult(storageRequests: 3, instancesDeleted: 1, rowsDeleted: 5, isComplete: null); + + // Assert + Assert.IsNull(result.IsComplete); + } + + [TestMethod] + public void ToCorePurgeHistoryResult_IsCompleteTrue_PropagatedToPurgeResult() + { + // Arrange + var result = new PurgeHistoryResult(storageRequests: 10, instancesDeleted: 5, rowsDeleted: 20, isComplete: true); + + // Act + PurgeResult coreResult = result.ToCorePurgeHistoryResult(); + + // Assert + Assert.AreEqual(5, coreResult.DeletedInstanceCount); + Assert.IsTrue(coreResult.IsComplete.HasValue); + Assert.IsTrue(coreResult.IsComplete.Value); + } + + [TestMethod] + public void ToCorePurgeHistoryResult_IsCompleteFalse_PropagatedToPurgeResult() + { + // Arrange + var result = new PurgeHistoryResult(storageRequests: 8, instancesDeleted: 2, rowsDeleted: 15, isComplete: false); + + // Act + PurgeResult coreResult = result.ToCorePurgeHistoryResult(); + + // Assert + Assert.AreEqual(2, coreResult.DeletedInstanceCount); + Assert.IsTrue(coreResult.IsComplete.HasValue); + Assert.IsFalse(coreResult.IsComplete.Value); + } + + [TestMethod] + public void ToCorePurgeHistoryResult_IsCompleteNull_PropagatedToPurgeResult() + { + // Arrange + var result = new PurgeHistoryResult(storageRequests: 3, instancesDeleted: 1, rowsDeleted: 5, isComplete: null); + + // Act + PurgeResult coreResult = result.ToCorePurgeHistoryResult(); + + // Assert + Assert.AreEqual(1, coreResult.DeletedInstanceCount); + Assert.IsNull(coreResult.IsComplete); + } + + [TestMethod] + public void ToCorePurgeHistoryResult_OldConstructor_IsCompleteNull() + { + // Arrange - using the old constructor without IsComplete (backward compat) + var result = new PurgeHistoryResult(storageRequests: 5, instancesDeleted: 3, rowsDeleted: 10); + + // Act + PurgeResult coreResult = result.ToCorePurgeHistoryResult(); + + // Assert + Assert.AreEqual(3, coreResult.DeletedInstanceCount); + Assert.IsNull(coreResult.IsComplete); + } + } +} \ No newline at end of file diff --git a/test/DurableTask.AzureStorage.Tests/Storage/TableDeleteBatchParallelTests.cs b/test/DurableTask.AzureStorage.Tests/Storage/TableDeleteBatchParallelTests.cs new file mode 100644 index 000000000..56a00a80a --- /dev/null +++ b/test/DurableTask.AzureStorage.Tests/Storage/TableDeleteBatchParallelTests.cs @@ -0,0 +1,302 @@ +// ---------------------------------------------------------------------------------- +// Copyright Microsoft Corporation +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ---------------------------------------------------------------------------------- +#nullable enable +namespace DurableTask.AzureStorage.Tests.Storage +{ + using System; + using System.Collections.Generic; + using System.Linq; + using System.Threading; + using System.Threading.Tasks; + using Azure; + using Azure.Data.Tables; + using DurableTask.AzureStorage.Storage; + using Microsoft.VisualStudio.TestTools.UnitTesting; + using Moq; + + [TestClass] + public class TableDeleteBatchParallelTests + { + const string ConnectionString = "UseDevelopmentStorage=true"; + const string TableName = "TestTable"; + + [TestMethod] + public async Task DeleteBatchParallelAsync_EmptyBatch_ReturnsEmptyResults() + { + Table table = CreateTableWithMockedClient(out _, out _); + var entities = new List(); + + TableTransactionResults results = await table.DeleteBatchParallelAsync(entities); + + Assert.AreEqual(0, results.Responses.Count); + Assert.AreEqual(0, results.RequestCount); + } + + [TestMethod] + public async Task DeleteBatchParallelAsync_SingleBatch_SubmitsOneTransaction() + { + var entities = CreateTestEntities("pk", count: 50); + Table table = CreateTableWithMockedClient(out _, out Mock tableClient); + + tableClient + .Setup(t => t.SubmitTransactionAsync( + It.Is>(a => a.Count() == 50), + It.IsAny())) + .ReturnsAsync(CreateMockBatchResponse(50)); + + TableTransactionResults results = await table.DeleteBatchParallelAsync(entities); + + Assert.AreEqual(50, results.Responses.Count); + tableClient.Verify( + t => t.SubmitTransactionAsync(It.IsAny>(), It.IsAny()), + Times.Once); + } + + [TestMethod] + public async Task DeleteBatchParallelAsync_MultipleBatches_SplitsIntoChunksOf100() + { + var entities = CreateTestEntities("pk", count: 250); + Table table = CreateTableWithMockedClient(out _, out Mock tableClient); + + tableClient + .Setup(t => t.SubmitTransactionAsync( + It.IsAny>(), + It.IsAny())) + .ReturnsAsync((IEnumerable batch, CancellationToken _) => + CreateMockBatchResponse(batch.Count())); + + TableTransactionResults results = await table.DeleteBatchParallelAsync(entities); + + Assert.AreEqual(250, results.Responses.Count); + tableClient.Verify( + t => t.SubmitTransactionAsync(It.IsAny>(), It.IsAny()), + Times.Exactly(3)); + } + + [TestMethod] + public async Task DeleteBatchParallelAsync_SubmitsBatchesConcurrently() + { + var entities = CreateTestEntities("pk", count: 500); // 5 batches of 100 + int concurrentCount = 0; + int maxConcurrent = 0; + + Table table = CreateTableWithMockedClient(out _, out Mock tableClient); + + tableClient + .Setup(t => t.SubmitTransactionAsync( + It.IsAny>(), + It.IsAny())) + .Returns(async (IEnumerable batch, CancellationToken _) => + { + int current = Interlocked.Increment(ref concurrentCount); + int snapshot; + do + { + snapshot = Volatile.Read(ref maxConcurrent); + } + while (current > snapshot && Interlocked.CompareExchange(ref maxConcurrent, current, snapshot) != snapshot); + + await Task.Delay(50); + Interlocked.Decrement(ref concurrentCount); + + return CreateMockBatchResponse(batch.Count()); + }); + + await table.DeleteBatchParallelAsync(entities); + + // All 5 batches should run concurrently since there's no internal semaphore + Assert.IsTrue( + maxConcurrent > 1, + $"Expected concurrent execution, but max concurrent was {maxConcurrent}"); + } + + [TestMethod] + public async Task DeleteBatchParallelAsync_BatchFails404_FallsBackToIndividualDeletes() + { + var entities = CreateTestEntities("pk", count: 3); + Table table = CreateTableWithMockedClient(out _, out Mock tableClient); + + tableClient + .Setup(t => t.SubmitTransactionAsync( + It.IsAny>(), + It.IsAny())) + .ThrowsAsync(new RequestFailedException(404, "Entity not found")); + + var mockResponse = new Mock(); + tableClient + .Setup(t => t.DeleteEntityAsync( + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny())) + .ReturnsAsync(mockResponse.Object); + + TableTransactionResults results = await table.DeleteBatchParallelAsync(entities); + + Assert.AreEqual(3, results.Responses.Count); + tableClient.Verify( + t => t.DeleteEntityAsync(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny()), + Times.Exactly(3)); + } + + [TestMethod] + public async Task DeleteBatchParallelAsync_IndividualDeleteSkips404() + { + var entities = CreateTestEntities("pk", count: 3); + Table table = CreateTableWithMockedClient(out _, out Mock tableClient); + + tableClient + .Setup(t => t.SubmitTransactionAsync( + It.IsAny>(), + It.IsAny())) + .ThrowsAsync(new RequestFailedException(404, "Entity not found")); + + int callCount = 0; + var mockResponse = new Mock(); + tableClient + .Setup(t => t.DeleteEntityAsync( + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny())) + .Returns((string pk, string rk, ETag ifMatch, CancellationToken ct) => + { + int call = Interlocked.Increment(ref callCount); + if (call == 2) + { + throw new RequestFailedException(404, "Entity already deleted"); + } + return Task.FromResult(mockResponse.Object); + }); + + TableTransactionResults results = await table.DeleteBatchParallelAsync(entities); + + Assert.AreEqual(2, results.Responses.Count); + Assert.AreEqual(3, results.RequestCount); + } + + [TestMethod] + public async Task DeleteBatchParallelAsync_ExactlyOneBatch_NoBoundaryIssues() + { + var entities = CreateTestEntities("pk", count: 100); + Table table = CreateTableWithMockedClient(out _, out Mock tableClient); + + tableClient + .Setup(t => t.SubmitTransactionAsync( + It.Is>(a => a.Count() == 100), + It.IsAny())) + .ReturnsAsync(CreateMockBatchResponse(100)); + + TableTransactionResults results = await table.DeleteBatchParallelAsync(entities); + + Assert.AreEqual(100, results.Responses.Count); + tableClient.Verify( + t => t.SubmitTransactionAsync(It.IsAny>(), It.IsAny()), + Times.Once); + } + + [TestMethod] + public async Task DeleteBatchParallelAsync_101Entities_CreatesTwoBatches() + { + var entities = CreateTestEntities("pk", count: 101); + Table table = CreateTableWithMockedClient(out _, out Mock tableClient); + + tableClient + .Setup(t => t.SubmitTransactionAsync( + It.IsAny>(), + It.IsAny())) + .ReturnsAsync((IEnumerable batch, CancellationToken _) => + CreateMockBatchResponse(batch.Count())); + + TableTransactionResults results = await table.DeleteBatchParallelAsync(entities); + + Assert.AreEqual(101, results.Responses.Count); + tableClient.Verify( + t => t.SubmitTransactionAsync(It.IsAny>(), It.IsAny()), + Times.Exactly(2)); + } + + [TestMethod] + public async Task DeleteBatchParallelAsync_CancellationToken_IsPropagated() + { + var entities = CreateTestEntities("pk", count: 200); + using var cts = new CancellationTokenSource(); + Table table = CreateTableWithMockedClient(out _, out Mock tableClient); + + int batchesSubmitted = 0; + tableClient + .Setup(t => t.SubmitTransactionAsync( + It.IsAny>(), + It.IsAny())) + .Returns(async (IEnumerable batch, CancellationToken ct) => + { + int count = Interlocked.Increment(ref batchesSubmitted); + if (count == 1) + { + cts.Cancel(); + } + ct.ThrowIfCancellationRequested(); + return CreateMockBatchResponse(batch.Count()); + }); + + await Assert.ThrowsExceptionAsync( + () => table.DeleteBatchParallelAsync(entities, cts.Token)); + } + + #region Helper Methods + + static Table CreateTableWithMockedClient( + out Mock tableServiceClient, + out Mock tableClient) + { + var settings = new AzureStorageOrchestrationServiceSettings + { + StorageAccountClientProvider = new StorageAccountClientProvider(ConnectionString), + }; + + var azureStorageClient = new AzureStorageClient(settings); + + tableServiceClient = new Mock(MockBehavior.Strict, ConnectionString); + tableClient = new Mock(MockBehavior.Loose, ConnectionString, TableName); + tableClient.Setup(t => t.Name).Returns(TableName); + tableServiceClient.Setup(t => t.GetTableClient(TableName)).Returns(tableClient.Object); + + return new Table(azureStorageClient, tableServiceClient.Object, TableName); + } + + static List CreateTestEntities(string partitionKey, int count) + { + var entities = new List(count); + for (int i = 0; i < count; i++) + { + entities.Add(new TableEntity(partitionKey, $"rk_{i:D5}") + { + ETag = ETag.All, + }); + } + return entities; + } + + static Response> CreateMockBatchResponse(int count) + { + var responses = new List(); + for (int i = 0; i < count; i++) + { + responses.Add(new Mock().Object); + } + return Response.FromValue>(responses, new Mock().Object); + } + + #endregion + } +} diff --git a/test/DurableTask.AzureStorage.Tests/TestOrchestrationClient.cs b/test/DurableTask.AzureStorage.Tests/TestOrchestrationClient.cs index 92d018eed..a8a2907f3 100644 --- a/test/DurableTask.AzureStorage.Tests/TestOrchestrationClient.cs +++ b/test/DurableTask.AzureStorage.Tests/TestOrchestrationClient.cs @@ -200,6 +200,15 @@ public Task PurgeInstanceHistoryByTimePeriod(DateTime createdTimeFrom, DateTime? return service.PurgeInstanceHistoryAsync(createdTimeFrom, createdTimeTo, runtimeStatus); } + public Task PurgeInstanceHistoryByTimePeriodWithTimeout(DateTime createdTimeFrom, DateTime? createdTimeTo, IEnumerable runtimeStatus, TimeSpan? timeout) + { + Trace.TraceInformation($"Purging history from {createdTimeFrom} to {createdTimeTo} with timeout {timeout}"); + + // The Purge Instance History API only exists in the service object + AzureStorageOrchestrationService service = (AzureStorageOrchestrationService)this.client.ServiceClient; + return service.PurgeInstanceHistoryAsync(createdTimeFrom, createdTimeTo, runtimeStatus, timeout); + } + public async Task> GetOrchestrationHistoryAsync(string instanceId) { Trace.TraceInformation($"Getting history for instance with id - {this.instanceId}"); diff --git a/test/DurableTask.Core.Tests/PurgeInstanceFilterTests.cs b/test/DurableTask.Core.Tests/PurgeInstanceFilterTests.cs new file mode 100644 index 000000000..c4f6e5d4f --- /dev/null +++ b/test/DurableTask.Core.Tests/PurgeInstanceFilterTests.cs @@ -0,0 +1,95 @@ +// ---------------------------------------------------------------------------------- +// Copyright Microsoft Corporation +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ---------------------------------------------------------------------------------- + +namespace DurableTask.Core.Tests +{ + using System; + using System.Collections.Generic; + using Microsoft.VisualStudio.TestTools.UnitTesting; + + [TestClass] + public class PurgeInstanceFilterTests + { + [TestMethod] + public void Timeout_DefaultIsNull() + { + var filter = new PurgeInstanceFilter(DateTime.UtcNow.AddDays(-1), null, null); + Assert.IsNull(filter.Timeout); + } + + [TestMethod] + public void Timeout_CanBeSet() + { + var filter = new PurgeInstanceFilter(DateTime.UtcNow.AddDays(-1), null, null); + filter.Timeout = TimeSpan.FromSeconds(30); + Assert.IsNotNull(filter.Timeout); + Assert.AreEqual(TimeSpan.FromSeconds(30), filter.Timeout.Value); + } + + [TestMethod] + public void Timeout_CanBeSetToNull() + { + var filter = new PurgeInstanceFilter(DateTime.UtcNow.AddDays(-1), null, null); + filter.Timeout = TimeSpan.FromSeconds(30); + filter.Timeout = null; + Assert.IsNull(filter.Timeout); + } + + [TestMethod] + public void Constructor_PreservesAllProperties() + { + var from = DateTime.UtcNow.AddDays(-7); + var to = DateTime.UtcNow; + var statuses = new List { OrchestrationStatus.Completed, OrchestrationStatus.Failed }; + var filter = new PurgeInstanceFilter(from, to, statuses); + Assert.AreEqual(from, filter.CreatedTimeFrom); + Assert.AreEqual(to, filter.CreatedTimeTo); + Assert.IsNotNull(filter.RuntimeStatus); + Assert.IsNull(filter.Timeout); + } + + [TestMethod] + public void PurgeResult_IsComplete_True() + { + var result = new PurgeResult(10, isComplete: true); + Assert.AreEqual(10, result.DeletedInstanceCount); + Assert.IsTrue(result.IsComplete.HasValue); + Assert.IsTrue(result.IsComplete.Value); + } + + [TestMethod] + public void PurgeResult_IsComplete_False() + { + var result = new PurgeResult(5, isComplete: false); + Assert.AreEqual(5, result.DeletedInstanceCount); + Assert.IsTrue(result.IsComplete.HasValue); + Assert.IsFalse(result.IsComplete.Value); + } + + [TestMethod] + public void PurgeResult_IsComplete_Null() + { + var result = new PurgeResult(3, isComplete: null); + Assert.AreEqual(3, result.DeletedInstanceCount); + Assert.IsNull(result.IsComplete); + } + + [TestMethod] + public void PurgeResult_OldConstructor_IsCompleteNull() + { + var result = new PurgeResult(7); + Assert.AreEqual(7, result.DeletedInstanceCount); + Assert.IsNull(result.IsComplete); + } + } +} \ No newline at end of file