diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogDiscovery.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogDiscovery.java index 56f40866a76..175de94f871 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogDiscovery.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogDiscovery.java @@ -18,7 +18,9 @@ package org.apache.phoenix.replication; import java.io.IOException; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -83,6 +85,25 @@ public abstract class ReplicationLogDiscovery { */ protected static final double DEFAULT_WAITING_BUFFER_PERCENTAGE = 15.0; + /** + * Configuration key for maximum number of retries per in-progress file within a single processing + * round. Files that fail this many times are skipped for the rest of the round. + */ + public static final String REPLICATION_IN_PROGRESS_FILE_MAX_RETRIES_KEY = + "phoenix.replication.in.progress.file.max.retries"; + + public static final int DEFAULT_IN_PROGRESS_FILE_MAX_RETRIES = 1; + + /** + * Configuration key for the minimum age (in seconds) of an in-progress file's rename timestamp + * before it becomes eligible for processing. This prevents a file recently marked in-progress by + * one region server from being immediately picked up by another. + */ + public static final String REPLICATION_IN_PROGRESS_FILE_MIN_AGE_SECONDS_KEY = + "phoenix.replication.in.progress.file.min.age.seconds"; + + public static final int DEFAULT_IN_PROGRESS_FILE_MIN_AGE_SECONDS = 60; + protected final Configuration conf; protected final String haGroupName; protected final ReplicationLogTracker replicationLogTracker; @@ -283,8 +304,9 @@ protected void processNewFilesForRound(ReplicationRound replicationRound) throws } /** - * Processes all files (older than 1 round time) in the in-progress directory. Continuously - * processes files until no in-progress files remain. + * Processes all files in the in-progress directory whose rename timestamp is older than the + * configured minimum age. Continuously processes files until no eligible in-progress files + * remain. * @throws IOException if there's an error during file processing */ protected void processInProgressDirectory() throws IOException { @@ -293,16 +315,31 @@ protected void processInProgressDirectory() throws IOException { // Increase the count for number of times in progress directory is processed getMetrics().incrementNumInProgressDirectoryProcessed(); long startTime = EnvironmentEdgeManager.currentTime(); - long oldestTimestampToProcess = - replicationLogTracker.getReplicationShardDirectoryManager().getNearestRoundStartTimestamp( - EnvironmentEdgeManager.currentTime()) - getReplayIntervalSeconds() * 1000L; - List files = replicationLogTracker.getOlderInProgressFiles(oldestTimestampToProcess); - LOG.info("Number of {} files with oldestTimestampToProcess {} is {} for haGroup: {}", - replicationLogTracker.getInProgressLogSubDirectoryName(), oldestTimestampToProcess, + long renameTimestampThreshold = + EnvironmentEdgeManager.currentTime() - getInProgressFileMinAgeSeconds() * 1000L; + int maxRetries = getInProgressFileMaxRetries(); + Map failureCount = new HashMap<>(); + List files = replicationLogTracker.getOlderInProgressFiles(renameTimestampThreshold); + LOG.info("Number of {} files with renameTimestampThreshold {} is {} for haGroup: {}", + replicationLogTracker.getInProgressLogSubDirectoryName(), renameTimestampThreshold, files.size(), haGroupName); while (!files.isEmpty()) { - processOneRandomFile(files); - files = replicationLogTracker.getOlderInProgressFiles(oldestTimestampToProcess); + Optional failedFile = processOneRandomFile(files); + if (failedFile.isPresent()) { + String prefix = replicationLogTracker.getFilePrefix(failedFile.get()); + int count = failureCount.merge(prefix, 1, Integer::sum); + if (count >= maxRetries) { + LOG.warn( + "File {} (prefix: {}) has failed {} time(s), reached max retries ({}). " + + "Skipping for the rest of this round for haGroup: {}", + failedFile.get(), prefix, count, maxRetries, haGroupName); + } + } + renameTimestampThreshold = + EnvironmentEdgeManager.currentTime() - getInProgressFileMinAgeSeconds() * 1000L; + files = replicationLogTracker.getOlderInProgressFiles(renameTimestampThreshold); + files.removeIf( + f -> failureCount.getOrDefault(replicationLogTracker.getFilePrefix(f), 0) >= maxRetries); } long duration = EnvironmentEdgeManager.currentTime() - startTime; LOG.info("Finished in-progress files processing in {}ms for haGroup: {}", duration, @@ -314,8 +351,9 @@ protected void processInProgressDirectory() throws IOException { * Processes a single random file from the provided list. Marks the file as in-progress, processes * it, and marks it as completed or failed. * @param files - List of files from which to select and process one randomly + * @return the original path of the file that failed, or empty if processing succeeded */ - private void processOneRandomFile(final List files) throws IOException { + private Optional processOneRandomFile(final List files) throws IOException { // Pick a random file and process it Path file = files.get(ThreadLocalRandom.current().nextInt(files.size())); Optional optionalInProgressFilePath = Optional.empty(); @@ -329,9 +367,9 @@ private void processOneRandomFile(final List files) throws IOException { LOG.error("Failed to process the file {}", file, exception); optionalInProgressFilePath.ifPresent(replicationLogTracker::markFailed); // Not throwing this exception because next time another random file will be retried. - // If it's persistent failure for in_progress directory, - // cluster state should to be DEGRADED_STANDBY_FOR_READER. + return Optional.of(file); } + return Optional.empty(); } /** @@ -469,6 +507,16 @@ public double getWaitingBufferPercentage() { return DEFAULT_WAITING_BUFFER_PERCENTAGE; } + public int getInProgressFileMaxRetries() { + return conf.getInt(REPLICATION_IN_PROGRESS_FILE_MAX_RETRIES_KEY, + DEFAULT_IN_PROGRESS_FILE_MAX_RETRIES); + } + + public int getInProgressFileMinAgeSeconds() { + return conf.getInt(REPLICATION_IN_PROGRESS_FILE_MIN_AGE_SECONDS_KEY, + DEFAULT_IN_PROGRESS_FILE_MIN_AGE_SECONDS); + } + public ReplicationLogTracker getReplicationLogFileTracker() { return this.replicationLogTracker; } diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogTracker.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogTracker.java index 03832a4d850..b6a7a105e10 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogTracker.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogTracker.java @@ -28,8 +28,8 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.phoenix.replication.metrics.MetricsReplicationLogTracker; -import org.apache.phoenix.util.EnvironmentEdgeManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -203,15 +203,15 @@ public List getInProgressFiles() throws IOException { } /** - * Retrieves all valid log files in the in-progress directory that are older than the specified - * timestamp. - * @param timestampThreshold - The timestamp threshold in milliseconds. Files with timestamps less - * than this value will be returned. - * @return List of valid log file paths in the in-progress directory that are older than the + * Retrieves all valid log files in the in-progress directory whose rename timestamp is older than + * the specified threshold. Files without a rename timestamp are skipped. + * @param renameTimestampThreshold - The timestamp threshold in milliseconds. Files with rename + * timestamps less than this value will be returned. + * @return List of valid log file paths in the in-progress directory that were renamed before the * threshold, empty list if directory doesn't exist or no files match * @throws IOException if there's an error accessing the file system */ - public List getOlderInProgressFiles(long timestampThreshold) throws IOException { + public List getOlderInProgressFiles(long renameTimestampThreshold) throws IOException { if (!fileSystem.exists(getInProgressDirPath())) { return Collections.emptyList(); } @@ -221,20 +221,19 @@ public List getOlderInProgressFiles(long timestampThreshold) throws IOExce for (FileStatus status : fileStatuses) { if (status.isFile() && isValidLogFile(status.getPath())) { - try { - long fileTimestamp = getFileTimestamp(status.getPath()); - if (fileTimestamp < timestampThreshold) { - olderInProgressFiles.add(status.getPath()); - } - } catch (NumberFormatException e) { - LOG.warn("Failed to extract timestamp from file {}, skipping", - status.getPath().getName()); + Optional renameTimestamp = getRenameTimestamp(status.getPath()); + if (!renameTimestamp.isPresent()) { + LOG.warn("File {} has no rename timestamp, skipping", status.getPath().getName()); + continue; + } + if (renameTimestamp.get() < renameTimestampThreshold) { + olderInProgressFiles.add(status.getPath()); } } } - LOG.debug("Found {} in-progress files older than timestamp {}", olderInProgressFiles.size(), - timestampThreshold); + LOG.debug("Found {} in-progress files renamed before timestamp {}", olderInProgressFiles.size(), + renameTimestampThreshold); return olderInProgressFiles; } @@ -267,7 +266,7 @@ public List getNewFiles() throws IOException { * @return true if file was successfully deleted, false otherwise */ protected boolean markCompleted(final Path file) { - long startTime = EnvironmentEdgeManager.currentTimeMillis(); + long startTime = EnvironmentEdgeManager.currentTime(); // Increment the metrics count getMetrics().incrementMarkFileCompletedRequestCount(); @@ -282,7 +281,7 @@ protected boolean markCompleted(final Path file) { try { if (fileSystem.delete(fileToDelete, false)) { LOG.info("Successfully deleted completed file: {}", fileToDelete); - long endTime = EnvironmentEdgeManager.currentTimeMillis(); + long endTime = EnvironmentEdgeManager.currentTime(); getMetrics().updateMarkFileCompletedTime(endTime - startTime); return true; } else { @@ -323,13 +322,13 @@ protected boolean markCompleted(final Path file) { } else if (matchingFiles.size() > 1) { LOG.warn("Multiple matching in-progress files found for prefix {}: {}", filePrefix, matchingFiles.size()); - long endTime = EnvironmentEdgeManager.currentTimeMillis(); + long endTime = EnvironmentEdgeManager.currentTime(); getMetrics().updateMarkFileCompletedTime(endTime - startTime); return false; } else { LOG.warn("No matching in-progress file found for prefix: {}. File must " + "have " + "been deleted by some other process.", filePrefix); - long endTime = EnvironmentEdgeManager.currentTimeMillis(); + long endTime = EnvironmentEdgeManager.currentTime(); getMetrics().updateMarkFileCompletedTime(endTime - startTime); return true; } @@ -341,7 +340,7 @@ protected boolean markCompleted(final Path file) { } } - long endTime = EnvironmentEdgeManager.currentTimeMillis(); + long endTime = EnvironmentEdgeManager.currentTime(); getMetrics().updateMarkFileCompletedTime(endTime - startTime); LOG.error("Failed to delete file after {} attempts: {}", maxRetries + 1, fileToDelete); @@ -355,7 +354,7 @@ protected boolean markCompleted(final Path file) { * @return Optional value of renamed path if file rename was successful, else Optional.empty() */ protected Optional markInProgress(final Path file) { - long startTime = EnvironmentEdgeManager.currentTimeMillis(); + long startTime = EnvironmentEdgeManager.currentTime(); try { final String fileName = file.getName(); @@ -364,26 +363,20 @@ protected Optional markInProgress(final Path file) { // Check if file is already in in-progress directory if (file.getParent().toUri().getPath().equals(getInProgressDirPath().toString())) { - // File is already in in-progress directory, replace UUID with a new one + // File is already in in-progress directory, replace UUID and rename timestamp // keep the directory same as in progress + // Format: ___.plog → extract prefix (first 2 parts) String[] parts = fileName.split("_"); - // Remove the last part (UUID) and add new UUID - StringBuilder newNameBuilder = new StringBuilder(); - for (int i = 0; i < parts.length - 1; i++) { - if (i > 0) { - newNameBuilder.append("_"); - } - newNameBuilder.append(parts[i]); - } - String extension = fileName.substring(fileName.lastIndexOf(".")); - newNameBuilder.append("_").append(UUID.randomUUID()).append(extension); - newFileName = newNameBuilder.toString(); + String prefix = parts[0] + "_" + parts[1]; + newFileName = + prefix + "_" + UUID.randomUUID() + "_" + EnvironmentEdgeManager.currentTime() + ".plog"; targetDirectory = file.getParent(); } else { - // File is not in in-progress directory, add UUID and move to IN_PROGRESS directory + // File is not in in-progress directory, add UUID + rename timestamp and move to + // IN_PROGRESS directory String baseName = fileName.substring(0, fileName.lastIndexOf(".")); - String extension = fileName.substring(fileName.lastIndexOf(".")); - newFileName = baseName + "_" + UUID.randomUUID() + extension; + newFileName = + baseName + "_" + UUID.randomUUID() + "_" + EnvironmentEdgeManager.currentTime() + ".plog"; targetDirectory = getInProgressDirPath(); } @@ -401,7 +394,7 @@ protected Optional markInProgress(final Path file) { } finally { // Update the metrics getMetrics().incrementMarkFileInProgressRequestCount(); - long endTime = EnvironmentEdgeManager.currentTimeMillis(); + long endTime = EnvironmentEdgeManager.currentTime(); getMetrics().updateMarkFileInProgressTime(endTime - startTime); } } @@ -427,42 +420,55 @@ public long getFileTimestamp(Path file) throws NumberFormatException { } /** - * Extracts the UUID from a log file name. Assumes UUID is the last part of the filename before - * the extension. + * Extracts the UUID from an in-progress file name. Format: ___.plog → + * UUID is the third underscore-separated part. * @param file - The file path to extract UUID from. - * @return Optional of UUID if file was in progress, else Optional.empty() + * @return Optional of UUID if file is in-progress format, else Optional.empty() */ protected Optional getFileUUID(Path file) throws NumberFormatException { String[] parts = file.getName().split("_"); - if (parts.length < 3) { + if (parts.length < 4) { return Optional.empty(); } - return Optional.of(parts[parts.length - 1].split("\\.")[0]); + return Optional.of(parts[2]); } /** - * Extracts everything except the UUID (last part) from a file path. For example, from - * "1704153600000_rs1_12345678-1234-1234-1234-123456789abc.plog" This method will return - * "1704153600000_rs1" - * @param file - The file path to extract prefix from. + * Extracts the rename timestamp from an in-progress file name. Format: + * ___.plog → renameTs is the last underscore-separated part (before + * extension). + * @param file - The file path to extract rename timestamp from. + * @return Optional of rename timestamp, or empty if not present or invalid */ - protected String getFilePrefix(Path file) { - String fileName = file.getName(); - String[] parts = fileName.split("_"); - if (parts.length < 3) { - return fileName.split("\\.")[0]; // Return full filename if no underscore found + public Optional getRenameTimestamp(Path file) { + String[] parts = file.getName().split("_"); + if (parts.length < 4) { + return Optional.empty(); } - - // Return everything except the last part (UUID) - StringBuilder prefix = new StringBuilder(); - for (int i = 0; i < parts.length - 1; i++) { - if (i > 0) { - prefix.append("_"); - } - prefix.append(parts[i]); + try { + String lastPart = parts[parts.length - 1]; + String withoutExtension = lastPart.substring(0, lastPart.lastIndexOf(".")); + return Optional.of(Long.parseLong(withoutExtension)); + } catch (NumberFormatException e) { + LOG.warn("Failed to parse rename timestamp from file {}", file.getName()); + return Optional.empty(); } + } - return prefix.toString(); + /** + * Extracts the stable prefix from a file path. The prefix is the first two underscore-separated + * parts (_), which remain stable across renames. For example, + * "1704153600000_rs1_uuid_renameTs.plog" returns "1704153600000_rs1" and "1704153600000_rs1.plog" + * also returns "1704153600000_rs1". + * @param file - The file path to extract prefix from. + */ + protected String getFilePrefix(Path file) { + String[] parts = file.getName().split("_"); + if (parts.length < 2) { + return file.getName().split("\\.")[0]; + } + String secondPart = parts[1].contains(".") ? parts[1].split("\\.")[0] : parts[1]; + return parts[0] + "_" + secondPart; } /** diff --git a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogDiscoveryTest.java b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogDiscoveryTest.java index 412c832305d..caacf175ced 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogDiscoveryTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogDiscoveryTest.java @@ -36,7 +36,9 @@ import java.util.Set; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; @@ -90,6 +92,7 @@ public void setUp() throws IOException { replicationShardDirectoryManager)); fileTracker.init(); + conf.setInt(ReplicationLogDiscovery.REPLICATION_IN_PROGRESS_FILE_MIN_AGE_SECONDS_KEY, 0); discovery = Mockito.spy(new TestableReplicationLogDiscovery(fileTracker)); Mockito.doReturn(metricsLogDiscovery).when(discovery).getMetrics(); } @@ -156,6 +159,24 @@ public void testStartAndStop() throws IOException { assertFalse("Discovery should not be running after stop", discovery.isRunning()); } + @Test + public void testGetInProgressFileMaxRetries() { + // Default value + assertEquals("Default maxRetries should be 1", + ReplicationLogDiscovery.DEFAULT_IN_PROGRESS_FILE_MAX_RETRIES, + discovery.getInProgressFileMaxRetries()); + + // Custom value from configuration + conf.setInt(ReplicationLogDiscovery.REPLICATION_IN_PROGRESS_FILE_MAX_RETRIES_KEY, 5); + assertEquals("maxRetries should reflect configured value", 5, + discovery.getInProgressFileMaxRetries()); + + // Reset to verify it reads from conf each time + conf.setInt(ReplicationLogDiscovery.REPLICATION_IN_PROGRESS_FILE_MAX_RETRIES_KEY, 3); + assertEquals("maxRetries should reflect updated configured value", 3, + discovery.getInProgressFileMaxRetries()); + } + /** * Tests processRound with in-progress directory processing enabled. Validates that both new files * and in-progress files are processed correctly. @@ -215,14 +236,14 @@ public void testProcessRoundWithInProgressDirectoryProcessing() throws IOExcepti // For in-progress files, they already have format: {timestamp}_{rs-n}_{UUID}.plog // Extract prefix before the UUID (everything before the last underscore) String fileName = file.getName(); - String prefix = fileName.substring(0, fileName.lastIndexOf("_")); + String prefix = extractPrefix(fileName); expectedProcessedFilePrefixes.add(prefix); } for (Path file : inProgressFiles0102) { // For in-progress files, they already have format: {timestamp}_{rs-n}_{UUID}.plog // Extract prefix before the UUID (everything before the last underscore) String fileName = file.getName(); - String prefix = fileName.substring(0, fileName.lastIndexOf("_")); + String prefix = extractPrefix(fileName); expectedProcessedFilePrefixes.add(prefix); } @@ -231,12 +252,7 @@ public void testProcessRoundWithInProgressDirectoryProcessing() throws IOExcepti Set actualProcessedFilePrefixes = new HashSet<>(); for (Path file : processedFiles) { String fileName = file.getName(); - // Extract prefix before UUID and extension (everything before the last underscore before - // .plog) - // Remove the extension first - String withoutExtension = fileName.substring(0, fileName.lastIndexOf(".")); - // Then get everything before the last underscore (which is the UUID) - String prefix = withoutExtension.substring(0, withoutExtension.lastIndexOf("_")); + String prefix = extractPrefix(fileName); actualProcessedFilePrefixes.add(prefix); } @@ -256,16 +272,14 @@ public void testProcessRoundWithInProgressDirectoryProcessing() throws IOExcepti } // For in-progress files for (Path expectedFile : inProgressFiles0004) { - String expectedPrefix = - expectedFile.getName().substring(0, expectedFile.getName().lastIndexOf("_")); - Mockito.verify(fileTracker, Mockito.times(1)).markInProgress(Mockito.argThat(path -> path - .getName().substring(0, path.getName().lastIndexOf("_")).equals(expectedPrefix))); + String expectedPrefix = extractPrefix(expectedFile.getName()); + Mockito.verify(fileTracker, Mockito.times(1)).markInProgress( + Mockito.argThat(path -> extractPrefix(path.getName()).equals(expectedPrefix))); } for (Path expectedFile : inProgressFiles0102) { - String expectedPrefix = - expectedFile.getName().substring(0, expectedFile.getName().lastIndexOf("_")); - Mockito.verify(fileTracker, Mockito.times(1)).markInProgress(Mockito.argThat(path -> path - .getName().substring(0, path.getName().lastIndexOf("_")).equals(expectedPrefix))); + String expectedPrefix = extractPrefix(expectedFile.getName()); + Mockito.verify(fileTracker, Mockito.times(1)).markInProgress( + Mockito.argThat(path -> extractPrefix(path.getName()).equals(expectedPrefix))); } // Verify that markCompleted was called 7 times (once for each successfully processed file) @@ -276,33 +290,19 @@ public void testProcessRoundWithInProgressDirectoryProcessing() throws IOExcepti for (Path expectedFile : newFilesForRound) { String expectedPrefix = expectedFile.getName().substring(0, expectedFile.getName().lastIndexOf(".")); - Mockito.verify(fileTracker, Mockito.times(1)).markCompleted(Mockito.argThat(path -> { - String pathName = path.getName(); - String withoutExtension = pathName.substring(0, pathName.lastIndexOf(".")); - String prefix = withoutExtension.substring(0, withoutExtension.lastIndexOf("_")); - return prefix.equals(expectedPrefix); - })); + Mockito.verify(fileTracker, Mockito.times(1)).markCompleted( + Mockito.argThat(path -> extractPrefix(path.getName()).equals(expectedPrefix))); } // For in-progress files (they will have updated UUIDs, but same prefix) for (Path expectedFile : inProgressFiles0004) { - String expectedPrefix = - expectedFile.getName().substring(0, expectedFile.getName().lastIndexOf("_")); - Mockito.verify(fileTracker, Mockito.times(1)).markCompleted(Mockito.argThat(path -> { - String pathName = path.getName(); - String withoutExtension = pathName.substring(0, pathName.lastIndexOf(".")); - String prefix = withoutExtension.substring(0, withoutExtension.lastIndexOf("_")); - return prefix.equals(expectedPrefix); - })); + String expectedPrefix = extractPrefix(expectedFile.getName()); + Mockito.verify(fileTracker, Mockito.times(1)).markCompleted( + Mockito.argThat(path -> extractPrefix(path.getName()).equals(expectedPrefix))); } for (Path expectedFile : inProgressFiles0102) { - String expectedPrefix = - expectedFile.getName().substring(0, expectedFile.getName().lastIndexOf("_")); - Mockito.verify(fileTracker, Mockito.times(1)).markCompleted(Mockito.argThat(path -> { - String pathName = path.getName(); - String withoutExtension = pathName.substring(0, pathName.lastIndexOf(".")); - String prefix = withoutExtension.substring(0, withoutExtension.lastIndexOf("_")); - return prefix.equals(expectedPrefix); - })); + String expectedPrefix = extractPrefix(expectedFile.getName()); + Mockito.verify(fileTracker, Mockito.times(1)).markCompleted( + Mockito.argThat(path -> extractPrefix(path.getName()).equals(expectedPrefix))); } // Verify that shouldProcessInProgressDirectory was called once @@ -388,8 +388,7 @@ public void testProcessRoundWithoutInProgressDirectoryProcessing() throws IOExce Set actualProcessedFilePrefixes = new HashSet<>(); for (Path file : processedFiles) { String fileName = file.getName(); - String withoutExtension = fileName.substring(0, fileName.lastIndexOf(".")); - String prefix = withoutExtension.substring(0, withoutExtension.lastIndexOf("_")); + String prefix = extractPrefix(fileName); actualProcessedFilePrefixes.add(prefix); } @@ -413,12 +412,8 @@ public void testProcessRoundWithoutInProgressDirectoryProcessing() throws IOExce for (Path expectedFile : newFilesForRound) { String expectedPrefix = expectedFile.getName().substring(0, expectedFile.getName().lastIndexOf(".")); - Mockito.verify(fileTracker, Mockito.times(1)).markCompleted(Mockito.argThat(path -> { - String pathName = path.getName(); - String withoutExtension = pathName.substring(0, pathName.lastIndexOf(".")); - String prefix = withoutExtension.substring(0, withoutExtension.lastIndexOf("_")); - return prefix.equals(expectedPrefix); - })); + Mockito.verify(fileTracker, Mockito.times(1)).markCompleted( + Mockito.argThat(path -> extractPrefix(path.getName()).equals(expectedPrefix))); } // Verify that shouldProcessInProgressDirectory was called once @@ -448,16 +443,14 @@ public void testProcessRoundWithoutInProgressDirectoryProcessing() throws IOExce // Validate that in-progress files were NOT processed for (Path unexpectedFile : inProgressFiles0004) { - String unexpectedPrefix = - unexpectedFile.getName().substring(0, unexpectedFile.getName().lastIndexOf("_")); + String unexpectedPrefix = extractPrefix(unexpectedFile.getName()); assertFalse( "Should NOT have processed in-progress file from 00:00:04: " + unexpectedFile.getName(), actualProcessedFilePrefixes.contains(unexpectedPrefix)); } for (Path unexpectedFile : inProgressFiles0102) { - String unexpectedPrefix = - unexpectedFile.getName().substring(0, unexpectedFile.getName().lastIndexOf("_")); + String unexpectedPrefix = extractPrefix(unexpectedFile.getName()); assertFalse( "Should NOT have processed in-progress file from 00:01:02: " + unexpectedFile.getName(), actualProcessedFilePrefixes.contains(unexpectedPrefix)); @@ -560,8 +553,7 @@ public void testProcessNewFilesForRound() throws IOException { Set actualProcessedFilePrefixes = new HashSet<>(); for (Path file : processedFiles) { String fileName = file.getName(); - String withoutExtension = fileName.substring(0, fileName.lastIndexOf(".")); - String prefix = withoutExtension.substring(0, withoutExtension.lastIndexOf("_")); + String prefix = extractPrefix(fileName); actualProcessedFilePrefixes.add(prefix); } @@ -585,12 +577,8 @@ public void testProcessNewFilesForRound() throws IOException { for (Path expectedFile : newFilesForRound) { String expectedPrefix = expectedFile.getName().substring(0, expectedFile.getName().lastIndexOf(".")); - Mockito.verify(fileTracker, Mockito.times(1)).markCompleted(Mockito.argThat(path -> { - String pathName = path.getName(); - String withoutExtension = pathName.substring(0, pathName.lastIndexOf(".")); - String prefix = withoutExtension.substring(0, withoutExtension.lastIndexOf("_")); - return prefix.equals(expectedPrefix); - })); + Mockito.verify(fileTracker, Mockito.times(1)).markCompleted( + Mockito.argThat(path -> extractPrefix(path.getName()).equals(expectedPrefix))); } // Validate that files from other rounds were NOT processed (using prefix comparison) @@ -618,16 +606,14 @@ public void testProcessNewFilesForRound() throws IOException { // Validate that in-progress files were NOT processed (processNewFilesForRound only processes // new files) for (Path unexpectedFile : inProgressFiles0004) { - String unexpectedPrefix = - unexpectedFile.getName().substring(0, unexpectedFile.getName().lastIndexOf("_")); + String unexpectedPrefix = extractPrefix(unexpectedFile.getName()); assertFalse( "Should NOT have processed in-progress file from 00:00:04: " + unexpectedFile.getName(), actualProcessedFilePrefixes.contains(unexpectedPrefix)); } for (Path unexpectedFile : inProgressFiles0102) { - String unexpectedPrefix = - unexpectedFile.getName().substring(0, unexpectedFile.getName().lastIndexOf("_")); + String unexpectedPrefix = extractPrefix(unexpectedFile.getName()); assertFalse( "Should NOT have processed in-progress file from 00:01:02: " + unexpectedFile.getName(), actualProcessedFilePrefixes.contains(unexpectedPrefix)); @@ -651,21 +637,11 @@ public void testProcessNewFilesForRoundWithPartialFailure() throws IOException { String file1Prefix = newFilesForRound.get(1).getName().substring(0, newFilesForRound.get(1).getName().lastIndexOf(".")); Mockito.doThrow(new IOException("Processing failed for file 1")).when(discovery) - .processFile(Mockito.argThat(path -> { - String pathName = path.getName(); - String withoutExtension = pathName.substring(0, pathName.lastIndexOf(".")); - String prefix = withoutExtension.substring(0, withoutExtension.lastIndexOf("_")); - return prefix.equals(file1Prefix); - })); + .processFile(Mockito.argThat(path -> extractPrefix(path.getName()).equals(file1Prefix))); String file3Prefix = newFilesForRound.get(3).getName().substring(0, newFilesForRound.get(3).getName().lastIndexOf(".")); Mockito.doThrow(new IOException("Processing failed for file 3")).when(discovery) - .processFile(Mockito.argThat(path -> { - String pathName = path.getName(); - String withoutExtension = pathName.substring(0, pathName.lastIndexOf(".")); - String prefix = withoutExtension.substring(0, withoutExtension.lastIndexOf("_")); - return prefix.equals(file3Prefix); - })); + .processFile(Mockito.argThat(path -> extractPrefix(path.getName()).equals(file3Prefix))); // Process new files for the round discovery.processNewFilesForRound(replicationRound); @@ -686,12 +662,8 @@ public void testProcessNewFilesForRoundWithPartialFailure() throws IOException { for (Path expectedFile : newFilesForRound) { String expectedPrefix = expectedFile.getName().substring(0, expectedFile.getName().lastIndexOf(".")); - Mockito.verify(discovery, Mockito.times(1)).processFile(Mockito.argThat(path -> { - String pathName = path.getName(); - String withoutExtension = pathName.substring(0, pathName.lastIndexOf(".")); - String prefix = withoutExtension.substring(0, withoutExtension.lastIndexOf("_")); - return prefix.equals(expectedPrefix); - })); + Mockito.verify(discovery, Mockito.times(1)) + .processFile(Mockito.argThat(path -> extractPrefix(path.getName()).equals(expectedPrefix))); } // Verify that markCompleted was called for each successfully processed file @@ -700,80 +672,40 @@ public void testProcessNewFilesForRoundWithPartialFailure() throws IOException { // Verify that markCompleted was called for each successfully processed file with correct paths String expectedPrefix0 = newFilesForRound.get(0).getName().substring(0, newFilesForRound.get(0).getName().lastIndexOf(".")); - Mockito.verify(fileTracker, Mockito.times(1)).markCompleted(Mockito.argThat(path -> { - String pathName = path.getName(); - String withoutExtension = pathName.substring(0, pathName.lastIndexOf(".")); - String prefix = withoutExtension.substring(0, withoutExtension.lastIndexOf("_")); - return prefix.equals(expectedPrefix0); - })); + Mockito.verify(fileTracker, Mockito.times(1)).markCompleted( + Mockito.argThat(path -> extractPrefix(path.getName()).equals(expectedPrefix0))); String expectedPrefix2 = newFilesForRound.get(2).getName().substring(0, newFilesForRound.get(2).getName().lastIndexOf(".")); - Mockito.verify(fileTracker, Mockito.times(1)).markCompleted(Mockito.argThat(path -> { - String pathName = path.getName(); - String withoutExtension = pathName.substring(0, pathName.lastIndexOf(".")); - String prefix = withoutExtension.substring(0, withoutExtension.lastIndexOf("_")); - return prefix.equals(expectedPrefix2); - })); + Mockito.verify(fileTracker, Mockito.times(1)).markCompleted( + Mockito.argThat(path -> extractPrefix(path.getName()).equals(expectedPrefix2))); String expectedPrefix4 = newFilesForRound.get(4).getName().substring(0, newFilesForRound.get(4).getName().lastIndexOf(".")); - Mockito.verify(fileTracker, Mockito.times(1)).markCompleted(Mockito.argThat(path -> { - String pathName = path.getName(); - String withoutExtension = pathName.substring(0, pathName.lastIndexOf(".")); - String prefix = withoutExtension.substring(0, withoutExtension.lastIndexOf("_")); - return prefix.equals(expectedPrefix4); - })); + Mockito.verify(fileTracker, Mockito.times(1)).markCompleted( + Mockito.argThat(path -> extractPrefix(path.getName()).equals(expectedPrefix4))); // Verify that markCompleted was NOT called for failed files String unexpectedPrefix1 = newFilesForRound.get(1).getName().substring(0, newFilesForRound.get(1).getName().lastIndexOf(".")); - Mockito.verify(fileTracker, Mockito.never()).markCompleted(Mockito.argThat(path -> { - String pathName = path.getName(); - String withoutExtension = pathName.substring(0, pathName.lastIndexOf(".")); - String prefix = withoutExtension.substring(0, withoutExtension.lastIndexOf("_")); - return prefix.equals(unexpectedPrefix1); - })); + Mockito.verify(fileTracker, Mockito.never()).markCompleted( + Mockito.argThat(path -> extractPrefix(path.getName()).equals(unexpectedPrefix1))); String unexpectedPrefix3 = newFilesForRound.get(3).getName().substring(0, newFilesForRound.get(3).getName().lastIndexOf(".")); - Mockito.verify(fileTracker, Mockito.never()).markCompleted(Mockito.argThat(path -> { - String pathName = path.getName(); - String withoutExtension = pathName.substring(0, pathName.lastIndexOf(".")); - String prefix = withoutExtension.substring(0, withoutExtension.lastIndexOf("_")); - return prefix.equals(unexpectedPrefix3); - })); + Mockito.verify(fileTracker, Mockito.never()).markCompleted( + Mockito.argThat(path -> extractPrefix(path.getName()).equals(unexpectedPrefix3))); // Verify that markFailed was called for failed files - Mockito.verify(fileTracker, Mockito.times(1)).markFailed(Mockito.argThat(path -> { - String pathName = path.getName(); - String withoutExtension = pathName.substring(0, pathName.lastIndexOf(".")); - String prefix = withoutExtension.substring(0, withoutExtension.lastIndexOf("_")); - return prefix.equals(unexpectedPrefix1); - })); - Mockito.verify(fileTracker, Mockito.times(1)).markFailed(Mockito.argThat(path -> { - String pathName = path.getName(); - String withoutExtension = pathName.substring(0, pathName.lastIndexOf(".")); - String prefix = withoutExtension.substring(0, withoutExtension.lastIndexOf("_")); - return prefix.equals(unexpectedPrefix3); - })); + Mockito.verify(fileTracker, Mockito.times(1)) + .markFailed(Mockito.argThat(path -> extractPrefix(path.getName()).equals(unexpectedPrefix1))); + Mockito.verify(fileTracker, Mockito.times(1)) + .markFailed(Mockito.argThat(path -> extractPrefix(path.getName()).equals(unexpectedPrefix3))); // Verify that markFailed was NOT called for successfully processed files - Mockito.verify(fileTracker, Mockito.never()).markFailed(Mockito.argThat(path -> { - String pathName = path.getName(); - String withoutExtension = pathName.substring(0, pathName.lastIndexOf(".")); - String prefix = withoutExtension.substring(0, withoutExtension.lastIndexOf("_")); - return prefix.equals(expectedPrefix0); - })); - Mockito.verify(fileTracker, Mockito.never()).markFailed(Mockito.argThat(path -> { - String pathName = path.getName(); - String withoutExtension = pathName.substring(0, pathName.lastIndexOf(".")); - String prefix = withoutExtension.substring(0, withoutExtension.lastIndexOf("_")); - return prefix.equals(expectedPrefix2); - })); - Mockito.verify(fileTracker, Mockito.never()).markFailed(Mockito.argThat(path -> { - String pathName = path.getName(); - String withoutExtension = pathName.substring(0, pathName.lastIndexOf(".")); - String prefix = withoutExtension.substring(0, withoutExtension.lastIndexOf("_")); - return prefix.equals(expectedPrefix4); - })); + Mockito.verify(fileTracker, Mockito.never()) + .markFailed(Mockito.argThat(path -> extractPrefix(path.getName()).equals(expectedPrefix0))); + Mockito.verify(fileTracker, Mockito.never()) + .markFailed(Mockito.argThat(path -> extractPrefix(path.getName()).equals(expectedPrefix2))); + Mockito.verify(fileTracker, Mockito.never()) + .markFailed(Mockito.argThat(path -> extractPrefix(path.getName()).equals(expectedPrefix4))); } /** @@ -793,12 +725,8 @@ public void testProcessNewFilesForRoundWithAllFailures() throws IOException { for (Path file : newFilesForRound) { String filePrefix = file.getName().substring(0, file.getName().lastIndexOf(".")); Mockito.doThrow(new IOException("Processing failed for file: " + file.getName())) - .when(discovery).processFile(Mockito.argThat(path -> { - String pathName = path.getName(); - String withoutExtension = pathName.substring(0, pathName.lastIndexOf(".")); - String prefix = withoutExtension.substring(0, withoutExtension.lastIndexOf("_")); - return prefix.equals(filePrefix); - })); + .when(discovery) + .processFile(Mockito.argThat(path -> extractPrefix(path.getName()).equals(filePrefix))); } // Process new files for the round @@ -820,12 +748,8 @@ public void testProcessNewFilesForRoundWithAllFailures() throws IOException { for (Path expectedFile : newFilesForRound) { String expectedPrefix = expectedFile.getName().substring(0, expectedFile.getName().lastIndexOf(".")); - Mockito.verify(discovery, Mockito.times(1)).processFile(Mockito.argThat(path -> { - String pathName = path.getName(); - String withoutExtension = pathName.substring(0, pathName.lastIndexOf(".")); - String prefix = withoutExtension.substring(0, withoutExtension.lastIndexOf("_")); - return prefix.equals(expectedPrefix); - })); + Mockito.verify(discovery, Mockito.times(1)) + .processFile(Mockito.argThat(path -> extractPrefix(path.getName()).equals(expectedPrefix))); } // Verify that markCompleted was NOT called for any file (all failed) @@ -838,12 +762,8 @@ public void testProcessNewFilesForRoundWithAllFailures() throws IOException { for (Path failedFile : newFilesForRound) { String expectedPrefix = failedFile.getName().substring(0, failedFile.getName().lastIndexOf(".")); - Mockito.verify(fileTracker, Mockito.times(1)).markFailed(Mockito.argThat(path -> { - String pathName = path.getName(); - String withoutExtension = pathName.substring(0, pathName.lastIndexOf(".")); - String prefix = withoutExtension.substring(0, withoutExtension.lastIndexOf("_")); - return prefix.equals(expectedPrefix); - })); + Mockito.verify(fileTracker, Mockito.times(1)) + .markFailed(Mockito.argThat(path -> extractPrefix(path.getName()).equals(expectedPrefix))); } } @@ -853,140 +773,143 @@ public void testProcessNewFilesForRoundWithAllFailures() throws IOException { */ @Test public void testProcessInProgressDirectory() throws IOException { - // 1. Create in-progress files for different timestamps - long timestamp0004 = 1704153600000L + (4 * 1000L); // 00:00:04 - List inProgressFiles0004 = createInProgressFiles(timestamp0004, 3); + // Inject an advancing clock so we can verify rename timestamps are recent + long initialTime = 1704153800000L; + long originalRenameTimestamp = 1704153604000L; + AtomicLong clock = new AtomicLong(initialTime); + EnvironmentEdge edge = clock::getAndIncrement; + EnvironmentEdgeManager.injectEdge(edge); - long timestamp0102 = 1704153660000L + (2 * 1000L); // 00:01:02 - List inProgressFiles0102 = createInProgressFiles(timestamp0102, 2); + try { + // 1. Create in-progress files for different timestamps with old rename timestamps + long timestamp0004 = 1704153600000L + (4 * 1000L); // 00:00:04 + List inProgressFiles0004 = + createInProgressFiles(timestamp0004, 3, originalRenameTimestamp); - long timestamp0206 = 1704153720000L + (6 * 1000L); // 00:02:06 - List inProgressFiles0206 = createInProgressFiles(timestamp0206, 2); + long timestamp0102 = 1704153660000L + (2 * 1000L); // 00:01:02 + List inProgressFiles0102 = + createInProgressFiles(timestamp0102, 2, originalRenameTimestamp); - // 2. Create some new files to ensure they are NOT processed - ReplicationRound replicationRound = new ReplicationRound(1704153600000L, 1704153660000L); // 00:00:00 - // - - // 00:01:00 - List newFilesForRound = createNewFilesForRound(replicationRound, 3); + long timestamp0206 = 1704153720000L + (6 * 1000L); // 00:02:06 + List inProgressFiles0206 = + createInProgressFiles(timestamp0206, 2, originalRenameTimestamp); - // Process in-progress directory - discovery.processInProgressDirectory(); + // 2. Create some new files to ensure they are NOT processed + ReplicationRound replicationRound = new ReplicationRound(1704153600000L, 1704153660000L); // 00:00:00 + // - + // 00:01:00 + List newFilesForRound = createNewFilesForRound(replicationRound, 3); - // 3. Ensure all in-progress files (7 total) are processed - List processedFiles = discovery.getProcessedFiles(); - assertEquals("Invalid number of files processed", 7, processedFiles.size()); + // Process in-progress directory + discovery.processInProgressDirectory(); - // Create set of expected files that should be processed (by prefix, since UUIDs are updated - // during markInProgress) - Set expectedProcessedFilePrefixes = new HashSet<>(); - for (Path file : inProgressFiles0004) { - String fileName = file.getName(); - String prefix = fileName.substring(0, fileName.lastIndexOf("_")); - expectedProcessedFilePrefixes.add(prefix); - } - for (Path file : inProgressFiles0102) { - String fileName = file.getName(); - String prefix = fileName.substring(0, fileName.lastIndexOf("_")); - expectedProcessedFilePrefixes.add(prefix); - } - for (Path file : inProgressFiles0206) { - String fileName = file.getName(); - String prefix = fileName.substring(0, fileName.lastIndexOf("_")); - expectedProcessedFilePrefixes.add(prefix); - } + // 3. Ensure all in-progress files (7 total) are processed + List processedFiles = discovery.getProcessedFiles(); + assertEquals("Invalid number of files processed", 7, processedFiles.size()); - // Create set of actually processed file paths (extract prefixes) - Set actualProcessedFilePrefixes = new HashSet<>(); - for (Path file : processedFiles) { - String fileName = file.getName(); - String withoutExtension = fileName.substring(0, fileName.lastIndexOf(".")); - String prefix = withoutExtension.substring(0, withoutExtension.lastIndexOf("_")); - actualProcessedFilePrefixes.add(prefix); - } + // Create set of expected files that should be processed (by prefix, since UUIDs are updated + // during markInProgress) + Set expectedProcessedFilePrefixes = new HashSet<>(); + for (Path file : inProgressFiles0004) { + String fileName = file.getName(); + String prefix = extractPrefix(fileName); + expectedProcessedFilePrefixes.add(prefix); + } + for (Path file : inProgressFiles0102) { + String fileName = file.getName(); + String prefix = extractPrefix(fileName); + expectedProcessedFilePrefixes.add(prefix); + } + for (Path file : inProgressFiles0206) { + String fileName = file.getName(); + String prefix = extractPrefix(fileName); + expectedProcessedFilePrefixes.add(prefix); + } - // Validate that sets are equal - assertEquals("Expected and actual processed files should match", expectedProcessedFilePrefixes, - actualProcessedFilePrefixes); + // Create set of actually processed file paths (extract prefixes) + Set actualProcessedFilePrefixes = new HashSet<>(); + for (Path file : processedFiles) { + String fileName = file.getName(); + String prefix = extractPrefix(fileName); + actualProcessedFilePrefixes.add(prefix); + } - // Verify that markInProgress was called 7 times - Mockito.verify(fileTracker, Mockito.times(7)).markInProgress(Mockito.any(Path.class)); + // Validate that sets are equal + assertEquals("Expected and actual processed files should match", + expectedProcessedFilePrefixes, actualProcessedFilePrefixes); - // Verify that markInProgress was called for each expected file - for (Path expectedFile : inProgressFiles0004) { - String expectedPrefix = - expectedFile.getName().substring(0, expectedFile.getName().lastIndexOf("_")); - Mockito.verify(fileTracker, Mockito.times(1)).markInProgress(Mockito.argThat(path -> { - String pathName = path.getName(); - String prefix = pathName.substring(0, pathName.lastIndexOf("_")); - return prefix.equals(expectedPrefix); - })); - } - for (Path expectedFile : inProgressFiles0102) { - String expectedPrefix = - expectedFile.getName().substring(0, expectedFile.getName().lastIndexOf("_")); - Mockito.verify(fileTracker, Mockito.times(1)).markInProgress(Mockito.argThat(path -> { - String pathName = path.getName(); - String prefix = pathName.substring(0, pathName.lastIndexOf("_")); - return prefix.equals(expectedPrefix); - })); - } - for (Path expectedFile : inProgressFiles0206) { - String expectedPrefix = - expectedFile.getName().substring(0, expectedFile.getName().lastIndexOf("_")); - Mockito.verify(fileTracker, Mockito.times(1)).markInProgress(Mockito.argThat(path -> { - String pathName = path.getName(); - String prefix = pathName.substring(0, pathName.lastIndexOf("_")); - return prefix.equals(expectedPrefix); - })); - } + // Verify that markInProgress was called 7 times + Mockito.verify(fileTracker, Mockito.times(7)).markInProgress(Mockito.any(Path.class)); - // Verify that markCompleted was called for each processed file - Mockito.verify(fileTracker, Mockito.times(7)).markCompleted(Mockito.any(Path.class)); + // Verify that markInProgress was called for each expected file + for (Path expectedFile : inProgressFiles0004) { + String expectedPrefix = extractPrefix(expectedFile.getName()); + Mockito.verify(fileTracker, Mockito.times(1)).markInProgress( + Mockito.argThat(path -> extractPrefix(path.getName()).equals(expectedPrefix))); + } + for (Path expectedFile : inProgressFiles0102) { + String expectedPrefix = extractPrefix(expectedFile.getName()); + Mockito.verify(fileTracker, Mockito.times(1)).markInProgress( + Mockito.argThat(path -> extractPrefix(path.getName()).equals(expectedPrefix))); + } + for (Path expectedFile : inProgressFiles0206) { + String expectedPrefix = extractPrefix(expectedFile.getName()); + Mockito.verify(fileTracker, Mockito.times(1)).markInProgress( + Mockito.argThat(path -> extractPrefix(path.getName()).equals(expectedPrefix))); + } - // Verify that markCompleted was called for each processed file with correct paths - for (Path expectedFile : inProgressFiles0004) { - String expectedPrefix = - expectedFile.getName().substring(0, expectedFile.getName().lastIndexOf("_")); - Mockito.verify(fileTracker, Mockito.times(1)).markCompleted(Mockito.argThat(path -> { - String pathName = path.getName(); - String withoutExtension = pathName.substring(0, pathName.lastIndexOf(".")); - String prefix = withoutExtension.substring(0, withoutExtension.lastIndexOf("_")); - return prefix.equals(expectedPrefix); - })); - } - for (Path expectedFile : inProgressFiles0102) { - String expectedPrefix = - expectedFile.getName().substring(0, expectedFile.getName().lastIndexOf("_")); - Mockito.verify(fileTracker, Mockito.times(1)).markCompleted(Mockito.argThat(path -> { - String pathName = path.getName(); - String withoutExtension = pathName.substring(0, pathName.lastIndexOf(".")); - String prefix = withoutExtension.substring(0, withoutExtension.lastIndexOf("_")); - return prefix.equals(expectedPrefix); - })); - } - for (Path expectedFile : inProgressFiles0206) { - String expectedPrefix = - expectedFile.getName().substring(0, expectedFile.getName().lastIndexOf("_")); - Mockito.verify(fileTracker, Mockito.times(1)).markCompleted(Mockito.argThat(path -> { - String pathName = path.getName(); - String withoutExtension = pathName.substring(0, pathName.lastIndexOf(".")); - String prefix = withoutExtension.substring(0, withoutExtension.lastIndexOf("_")); - return prefix.equals(expectedPrefix); - })); - } + // Verify that markCompleted was called for each processed file + Mockito.verify(fileTracker, Mockito.times(7)).markCompleted(Mockito.any(Path.class)); - // Validate that new files were NOT processed (processInProgressDirectory only processes - // in-progress files) - for (Path unexpectedFile : newFilesForRound) { - String unexpectedPrefix = - unexpectedFile.getName().substring(0, unexpectedFile.getName().lastIndexOf(".")); - assertFalse("Should NOT have processed new file: " + unexpectedFile.getName(), - actualProcessedFilePrefixes.contains(unexpectedPrefix)); + // Verify that markCompleted was called for each processed file with correct paths + for (Path expectedFile : inProgressFiles0004) { + String expectedPrefix = extractPrefix(expectedFile.getName()); + Mockito.verify(fileTracker, Mockito.times(1)).markCompleted( + Mockito.argThat(path -> extractPrefix(path.getName()).equals(expectedPrefix))); + } + for (Path expectedFile : inProgressFiles0102) { + String expectedPrefix = extractPrefix(expectedFile.getName()); + Mockito.verify(fileTracker, Mockito.times(1)).markCompleted( + Mockito.argThat(path -> extractPrefix(path.getName()).equals(expectedPrefix))); + } + for (Path expectedFile : inProgressFiles0206) { + String expectedPrefix = extractPrefix(expectedFile.getName()); + Mockito.verify(fileTracker, Mockito.times(1)).markCompleted( + Mockito.argThat(path -> extractPrefix(path.getName()).equals(expectedPrefix))); + } + + // Validate that new files were NOT processed (processInProgressDirectory only processes + // in-progress files) + for (Path unexpectedFile : newFilesForRound) { + String unexpectedPrefix = + unexpectedFile.getName().substring(0, unexpectedFile.getName().lastIndexOf(".")); + assertFalse("Should NOT have processed new file: " + unexpectedFile.getName(), + actualProcessedFilePrefixes.contains(unexpectedPrefix)); + } + + // Verify that all processed files had rename timestamps updated to recent values + // (not the original old rename timestamp) + for (Path processedFile : processedFiles) { + Optional renameTs = fileTracker.getRenameTimestamp(processedFile); + assertTrue("Processed file should have a rename timestamp: " + processedFile.getName(), + renameTs.isPresent()); + assertTrue("Processed file's rename timestamp should be recent (>= initialTime), but was " + + renameTs.get() + " for " + processedFile.getName(), renameTs.get() >= initialTime); + assertTrue( + "Processed file's rename timestamp should be newer than the original (" + + originalRenameTimestamp + "), but was " + renameTs.get(), + renameTs.get() > originalRenameTimestamp); + } + } finally { + EnvironmentEdgeManager.reset(); } } @Test public void testProcessInProgressDirectoryWithIntermittentFailure() throws IOException { + // Allow 2 retries so files that fail once can succeed on retry + conf.setInt(ReplicationLogDiscovery.REPLICATION_IN_PROGRESS_FILE_MAX_RETRIES_KEY, 2); + // Create in-progress files for different timestamps long timestamp0004 = 1704153600000L + (4 * 1000L); // 00:00:04 List inProgressFiles0004 = createInProgressFiles(timestamp0004, 3); @@ -1001,24 +924,14 @@ public void testProcessInProgressDirectoryWithIntermittentFailure() throws IOExc // Mock processFile to throw exception for specific files (files 1 and 3) only on first call, // succeed on retry - String file1Prefix = allInProgressFiles.get(1).getName().substring(0, - allInProgressFiles.get(1).getName().lastIndexOf("_")); + String file1Prefix = extractPrefix(allInProgressFiles.get(1).getName()); Mockito.doThrow(new IOException("Processing failed for file 1")).doCallRealMethod() - .when(discovery).processFile(Mockito.argThat(path -> { - String pathName = path.getName(); - String withoutExtension = pathName.substring(0, pathName.lastIndexOf(".")); - String prefix = withoutExtension.substring(0, withoutExtension.lastIndexOf("_")); - return prefix.equals(file1Prefix); - })); - String file3Prefix = allInProgressFiles.get(3).getName().substring(0, - allInProgressFiles.get(3).getName().lastIndexOf("_")); + .when(discovery) + .processFile(Mockito.argThat(path -> extractPrefix(path.getName()).equals(file1Prefix))); + String file3Prefix = extractPrefix(allInProgressFiles.get(3).getName()); Mockito.doThrow(new IOException("Processing failed for file 3")).doCallRealMethod() - .when(discovery).processFile(Mockito.argThat(path -> { - String pathName = path.getName(); - String withoutExtension = pathName.substring(0, pathName.lastIndexOf(".")); - String prefix = withoutExtension.substring(0, withoutExtension.lastIndexOf("_")); - return prefix.equals(file3Prefix); - })); + .when(discovery) + .processFile(Mockito.argThat(path -> extractPrefix(path.getName()).equals(file3Prefix))); // Process in-progress directory discovery.processInProgressDirectory(); @@ -1030,15 +943,10 @@ public void testProcessInProgressDirectoryWithIntermittentFailure() throws IOExc // Files 1 and 3 are called twice (initial attempt + retry), others once for (int i = 0; i < allInProgressFiles.size(); i++) { Path expectedFile = allInProgressFiles.get(i); - String expectedPrefix = - expectedFile.getName().substring(0, expectedFile.getName().lastIndexOf("_")); + String expectedPrefix = extractPrefix(expectedFile.getName()); int expectedTimes = (i == 1 || i == 3) ? 2 : 1; // Files 1 and 3 are retried - Mockito.verify(fileTracker, Mockito.times(expectedTimes)) - .markInProgress(Mockito.argThat(path -> { - String pathName = path.getName(); - String prefix = pathName.substring(0, pathName.lastIndexOf("_")); - return prefix.equals(expectedPrefix); - })); + Mockito.verify(fileTracker, Mockito.times(expectedTimes)).markInProgress( + Mockito.argThat(path -> extractPrefix(path.getName()).equals(expectedPrefix))); } // Verify that processFile was called for each file in the directory (i.e. 5 + 2 times for @@ -1049,16 +957,11 @@ public void testProcessInProgressDirectoryWithIntermittentFailure() throws IOExc // Files 1 and 3 should be called twice (fail once, succeed on retry), others once for (int i = 0; i < allInProgressFiles.size(); i++) { Path expectedFile = allInProgressFiles.get(i); - String expectedPrefix = - expectedFile.getName().substring(0, expectedFile.getName().lastIndexOf("_")); + String expectedPrefix = extractPrefix(expectedFile.getName()); int expectedTimes = (i == 1 || i == 3) ? 2 : 1; // Files 1 and 3 are called twice (fail + // retry success) - Mockito.verify(discovery, Mockito.times(expectedTimes)).processFile(Mockito.argThat(path -> { - String pathName = path.getName(); - String withoutExtension = pathName.substring(0, pathName.lastIndexOf(".")); - String prefix = withoutExtension.substring(0, withoutExtension.lastIndexOf("_")); - return prefix.equals(expectedPrefix); - })); + Mockito.verify(discovery, Mockito.times(expectedTimes)) + .processFile(Mockito.argThat(path -> extractPrefix(path.getName()).equals(expectedPrefix))); } // Verify that markCompleted was called for each successfully processed file @@ -1068,80 +971,272 @@ public void testProcessInProgressDirectoryWithIntermittentFailure() throws IOExc Mockito.verify(fileTracker, Mockito.times(2)).markFailed(Mockito.any(Path.class)); // Verify that markFailed was called once ONLY for failed files - String failedPrefix1 = allInProgressFiles.get(1).getName().substring(0, - allInProgressFiles.get(1).getName().lastIndexOf("_")); - Mockito.verify(fileTracker, Mockito.times(1)).markFailed(Mockito.argThat(path -> { - String pathName = path.getName(); - String withoutExtension = pathName.substring(0, pathName.lastIndexOf(".")); - String prefix = withoutExtension.substring(0, withoutExtension.lastIndexOf("_")); - return prefix.equals(failedPrefix1); - })); - String failedPrefix3 = allInProgressFiles.get(3).getName().substring(0, - allInProgressFiles.get(3).getName().lastIndexOf("_")); - Mockito.verify(fileTracker, Mockito.times(1)).markFailed(Mockito.argThat(path -> { - String pathName = path.getName(); - String withoutExtension = pathName.substring(0, pathName.lastIndexOf(".")); - String prefix = withoutExtension.substring(0, withoutExtension.lastIndexOf("_")); - return prefix.equals(failedPrefix3); - })); + String failedPrefix1 = extractPrefix(allInProgressFiles.get(1).getName()); + Mockito.verify(fileTracker, Mockito.times(1)) + .markFailed(Mockito.argThat(path -> extractPrefix(path.getName()).equals(failedPrefix1))); + String failedPrefix3 = extractPrefix(allInProgressFiles.get(3).getName()); + Mockito.verify(fileTracker, Mockito.times(1)) + .markFailed(Mockito.argThat(path -> extractPrefix(path.getName()).equals(failedPrefix3))); // Verify that markFailed was NOT called for files processed successfully in first iteration - String successPrefix0 = allInProgressFiles.get(0).getName().substring(0, - allInProgressFiles.get(0).getName().lastIndexOf("_")); - Mockito.verify(fileTracker, Mockito.never()).markFailed(Mockito.argThat(path -> { - String pathName = path.getName(); - String withoutExtension = pathName.substring(0, pathName.lastIndexOf(".")); - String prefix = withoutExtension.substring(0, withoutExtension.lastIndexOf("_")); - return prefix.equals(successPrefix0); - })); - String successPrefix2 = allInProgressFiles.get(2).getName().substring(0, - allInProgressFiles.get(2).getName().lastIndexOf("_")); - Mockito.verify(fileTracker, Mockito.never()).markFailed(Mockito.argThat(path -> { - String pathName = path.getName(); - String withoutExtension = pathName.substring(0, pathName.lastIndexOf(".")); - String prefix = withoutExtension.substring(0, withoutExtension.lastIndexOf("_")); - return prefix.equals(successPrefix2); - })); - String successPrefix4 = allInProgressFiles.get(4).getName().substring(0, - allInProgressFiles.get(4).getName().lastIndexOf("_")); - Mockito.verify(fileTracker, Mockito.never()).markFailed(Mockito.argThat(path -> { - String pathName = path.getName(); - String withoutExtension = pathName.substring(0, pathName.lastIndexOf(".")); - String prefix = withoutExtension.substring(0, withoutExtension.lastIndexOf("_")); - return prefix.equals(successPrefix4); - })); + String successPrefix0 = extractPrefix(allInProgressFiles.get(0).getName()); + Mockito.verify(fileTracker, Mockito.never()) + .markFailed(Mockito.argThat(path -> extractPrefix(path.getName()).equals(successPrefix0))); + String successPrefix2 = extractPrefix(allInProgressFiles.get(2).getName()); + Mockito.verify(fileTracker, Mockito.never()) + .markFailed(Mockito.argThat(path -> extractPrefix(path.getName()).equals(successPrefix2))); + String successPrefix4 = extractPrefix(allInProgressFiles.get(4).getName()); + Mockito.verify(fileTracker, Mockito.never()) + .markFailed(Mockito.argThat(path -> extractPrefix(path.getName()).equals(successPrefix4))); // Verify that markCompleted was called for each successfully processed file with correct paths for (Path expectedFile : allInProgressFiles) { - String expectedPrefix = - expectedFile.getName().substring(0, expectedFile.getName().lastIndexOf("_")); - Mockito.verify(fileTracker, Mockito.times(1)).markCompleted(Mockito.argThat(path -> { - String pathName = path.getName(); - String withoutExtension = pathName.substring(0, pathName.lastIndexOf(".")); - String prefix = withoutExtension.substring(0, withoutExtension.lastIndexOf("_")); - return prefix.equals(expectedPrefix); + String expectedPrefix = extractPrefix(expectedFile.getName()); + Mockito.verify(fileTracker, Mockito.times(1)).markCompleted( + Mockito.argThat(path -> extractPrefix(path.getName()).equals(expectedPrefix))); + } + } + + /** + * Tests that with default maxRetries=1, files that fail processing are skipped for the rest of + * the round and not retried. Verifies that successfully processed files are completed, while + * failed files are only marked as failed without retry. + */ + @Test + public void testProcessInProgressDirectorySkipsFailedFiles() throws IOException { + // Use default maxRetries=1 (no explicit config override) + // Create in-progress files + long timestamp0004 = 1704153600000L + (4 * 1000L); // 00:00:04 + List inProgressFiles = createInProgressFiles(timestamp0004, 3); + + // Mock processFile to always throw for file 1 (persistent failure) + String file1Prefix = extractPrefix(inProgressFiles.get(1).getName()); + Mockito.doThrow(new IOException("Persistent failure for file 1")).when(discovery) + .processFile(Mockito.argThat(path -> extractPrefix(path.getName()).equals(file1Prefix))); + + // Process in-progress directory + discovery.processInProgressDirectory(); + + // With maxRetries=1, file 1 should fail once and be skipped (no retry) + // Files 0 and 2 should succeed + // Total markInProgress calls: 3 (one per file, no retries) + Mockito.verify(fileTracker, Mockito.times(3)).markInProgress(Mockito.any(Path.class)); + + // processFile called 3 times (all files attempted once) + Mockito.verify(discovery, Mockito.times(3)).processFile(Mockito.any(Path.class)); + + // markCompleted called only for the 2 successful files + Mockito.verify(fileTracker, Mockito.times(2)).markCompleted(Mockito.any(Path.class)); + + // markFailed called once for the failed file + Mockito.verify(fileTracker, Mockito.times(1)).markFailed(Mockito.any(Path.class)); + + // Verify the failed file was NOT retried (processFile called exactly once for file1) + Mockito.verify(discovery, Mockito.times(1)) + .processFile(Mockito.argThat(path -> extractPrefix(path.getName()).equals(file1Prefix))); + + // Verify the successful files were each called exactly once + String file0Prefix = extractPrefix(inProgressFiles.get(0).getName()); + Mockito.verify(discovery, Mockito.times(1)) + .processFile(Mockito.argThat(path -> extractPrefix(path.getName()).equals(file0Prefix))); + String file2Prefix = extractPrefix(inProgressFiles.get(2).getName()); + Mockito.verify(discovery, Mockito.times(1)) + .processFile(Mockito.argThat(path -> extractPrefix(path.getName()).equals(file2Prefix))); + } + + /** + * Tests that when all in-progress files fail processing, the loop terminates without retrying any + * file (default maxRetries=1). Verifies no infinite loop occurs and all files are marked as + * failed. + */ + @Test + public void testProcessInProgressDirectoryAllFilesFail() throws IOException { + // Use default maxRetries=1 + // Create in-progress files + long timestamp0004 = 1704153600000L + (4 * 1000L); // 00:00:04 + List inProgressFiles = createInProgressFiles(timestamp0004, 3); + + String file0Prefix = extractPrefix(inProgressFiles.get(0).getName()); + String file1Prefix = extractPrefix(inProgressFiles.get(1).getName()); + String file2Prefix = extractPrefix(inProgressFiles.get(2).getName()); + + // Mock processFile to always throw for all files + Mockito.doThrow(new IOException("Persistent failure")).when(discovery) + .processFile(Mockito.any(Path.class)); + + // Process in-progress directory - should terminate without infinite loop + discovery.processInProgressDirectory(); + + // Each file attempted exactly once + Mockito.verify(fileTracker, Mockito.times(3)).markInProgress(Mockito.any(Path.class)); + Mockito.verify(discovery, Mockito.times(3)).processFile(Mockito.any(Path.class)); + + // Verify per-prefix: each file attempted once + Mockito.verify(discovery, Mockito.times(1)) + .processFile(Mockito.argThat(path -> extractPrefix(path.getName()).equals(file0Prefix))); + Mockito.verify(discovery, Mockito.times(1)) + .processFile(Mockito.argThat(path -> extractPrefix(path.getName()).equals(file1Prefix))); + Mockito.verify(discovery, Mockito.times(1)) + .processFile(Mockito.argThat(path -> extractPrefix(path.getName()).equals(file2Prefix))); + + // All files marked as failed + Mockito.verify(fileTracker, Mockito.times(3)).markFailed(Mockito.any(Path.class)); + Mockito.verify(fileTracker, Mockito.times(1)) + .markFailed(Mockito.argThat(path -> extractPrefix(path.getName()).equals(file0Prefix))); + Mockito.verify(fileTracker, Mockito.times(1)) + .markFailed(Mockito.argThat(path -> extractPrefix(path.getName()).equals(file1Prefix))); + Mockito.verify(fileTracker, Mockito.times(1)) + .markFailed(Mockito.argThat(path -> extractPrefix(path.getName()).equals(file2Prefix))); + + // No file completed + Mockito.verify(fileTracker, Mockito.never()).markCompleted(Mockito.any(Path.class)); + } + + /** + * Tests that the failure tracking is scoped per invocation of processInProgressDirectory. Files + * that fail in the first invocation are skipped for that round, but get a fresh chance in the + * second invocation where they succeed. + */ + @Test + public void testProcessInProgressDirectoryFailedFilesSucceedOnNextRound() throws IOException { + // Use default maxRetries=1 + // Inject an advancing clock so we can verify rename timestamps + long initialTime = 1704153800000L; + long originalRenameTimestamp = 1704153604000L; + AtomicLong clock = new AtomicLong(initialTime); + EnvironmentEdge edge = clock::getAndIncrement; + EnvironmentEdgeManager.injectEdge(edge); + + try { + // Create in-progress files with old rename timestamps + long timestamp0004 = 1704153600000L + (4 * 1000L); // 00:00:04 + List inProgressFiles = createInProgressFiles(timestamp0004, 3, originalRenameTimestamp); + + // Extract prefixes for verification + String file0Prefix = extractPrefix(inProgressFiles.get(0).getName()); + String file1Prefix = extractPrefix(inProgressFiles.get(1).getName()); + String file2Prefix = extractPrefix(inProgressFiles.get(2).getName()); + + // Verify original rename timestamp + for (Path file : inProgressFiles) { + Optional ts = fileTracker.getRenameTimestamp(file); + assertTrue("Original file should have rename timestamp", ts.isPresent()); + assertEquals("Original rename timestamp should match creation value", + originalRenameTimestamp, ts.get().longValue()); + } + + // Mock processFile to fail for file 1 only on the first call, succeed on subsequent calls + Mockito.doThrow(new IOException("Transient failure for file 1")).doCallRealMethod() + .when(discovery) + .processFile(Mockito.argThat(path -> extractPrefix(path.getName()).equals(file1Prefix))); + + // --- First invocation: file 1 fails, files 0 and 2 succeed --- + discovery.processInProgressDirectory(); + + // Verify markInProgress called once per file (3 total, no retries) + Mockito.verify(fileTracker, Mockito.times(1)) + .markInProgress(Mockito.argThat(path -> extractPrefix(path.getName()).equals(file0Prefix))); + Mockito.verify(fileTracker, Mockito.times(1)) + .markInProgress(Mockito.argThat(path -> extractPrefix(path.getName()).equals(file1Prefix))); + Mockito.verify(fileTracker, Mockito.times(1)) + .markInProgress(Mockito.argThat(path -> extractPrefix(path.getName()).equals(file2Prefix))); + + // Verify processFile called once per file + Mockito.verify(discovery, Mockito.times(1)) + .processFile(Mockito.argThat(path -> extractPrefix(path.getName()).equals(file0Prefix))); + Mockito.verify(discovery, Mockito.times(1)) + .processFile(Mockito.argThat(path -> extractPrefix(path.getName()).equals(file1Prefix))); + Mockito.verify(discovery, Mockito.times(1)) + .processFile(Mockito.argThat(path -> extractPrefix(path.getName()).equals(file2Prefix))); + + // Verify markCompleted called for files 0 and 2 (successful) + Mockito.verify(fileTracker, Mockito.times(1)) + .markCompleted(Mockito.argThat(path -> extractPrefix(path.getName()).equals(file0Prefix))); + Mockito.verify(fileTracker, Mockito.times(1)) + .markCompleted(Mockito.argThat(path -> extractPrefix(path.getName()).equals(file2Prefix))); + + // Verify markFailed called only for file 1 + Mockito.verify(fileTracker, Mockito.times(1)) + .markFailed(Mockito.argThat(path -> extractPrefix(path.getName()).equals(file1Prefix))); + Mockito.verify(fileTracker, Mockito.never()) + .markFailed(Mockito.argThat(path -> extractPrefix(path.getName()).equals(file0Prefix))); + Mockito.verify(fileTracker, Mockito.never()) + .markFailed(Mockito.argThat(path -> extractPrefix(path.getName()).equals(file2Prefix))); + + // After first round: file 1 should still be in in-progress with an UPDATED rename timestamp + // (not the original old one) + Path inProgressDir = fileTracker.getInProgressDirPath(); + FileStatus[] remainingFiles = localFs.listStatus(inProgressDir); + assertEquals("Only file 1 should remain in in-progress after first round", 1, + remainingFiles.length); + Path failedFilePath = remainingFiles[0].getPath(); + assertEquals("Remaining file should have file 1's prefix", file1Prefix, + extractPrefix(failedFilePath.getName())); + Optional updatedRenameTs = fileTracker.getRenameTimestamp(failedFilePath); + assertTrue("Failed file should have a rename timestamp", updatedRenameTs.isPresent()); + assertTrue("Failed file's rename timestamp should be newer than the original", + updatedRenameTs.get() > originalRenameTimestamp); + assertTrue("Failed file's rename timestamp should be recent (>= initialTime)", + updatedRenameTs.get() >= initialTime); + long firstRoundRenameTs = updatedRenameTs.get(); + + // --- Second invocation: file 1 should be retried and succeed --- + Mockito.clearInvocations(fileTracker, discovery); + + discovery.processInProgressDirectory(); + + // Verify only file 1 was picked up (files 0 and 2 already completed) + Mockito.verify(fileTracker, Mockito.times(1)) + .markInProgress(Mockito.argThat(path -> extractPrefix(path.getName()).equals(file1Prefix))); + Mockito.verify(fileTracker, Mockito.never()) + .markInProgress(Mockito.argThat(path -> extractPrefix(path.getName()).equals(file0Prefix))); + Mockito.verify(fileTracker, Mockito.never()) + .markInProgress(Mockito.argThat(path -> extractPrefix(path.getName()).equals(file2Prefix))); + + // Verify processFile called only for file 1 and it succeeded + Mockito.verify(discovery, Mockito.times(1)).processFile(Mockito.argThat(path -> { + if (!extractPrefix(path.getName()).equals(file1Prefix)) { + return false; + } + // Verify the path passed to processFile has a rename timestamp newer than + // the first round's rename timestamp + Optional ts = fileTracker.getRenameTimestamp(path); + assertTrue("processFile path should have rename timestamp", ts.isPresent()); + assertTrue("Second round rename timestamp should be newer than first round's", + ts.get() > firstRoundRenameTs); + return true; })); + + // Verify markCompleted called for file 1 + Mockito.verify(fileTracker, Mockito.times(1)) + .markCompleted(Mockito.argThat(path -> extractPrefix(path.getName()).equals(file1Prefix))); + + // Verify markFailed was not called in the second round + Mockito.verify(fileTracker, Mockito.never()).markFailed(Mockito.any(Path.class)); + } finally { + EnvironmentEdgeManager.reset(); } } /** - * Tests processing of in-progress directory when no files meet the timestamp criteria. Validates - * that no files are processed when all files are too recent. + * Tests processing of in-progress directory when no files meet the rename timestamp criteria. + * Validates that no files are processed when all files were recently renamed. */ @Test public void testProcessInProgressDirectoryWithNoOldFiles() throws IOException { - // Set up current time for consistent testing + // Set up current time and override min age to 60 seconds for this test long currentTime = 1704153660000L; // 00:01:00 + conf.setInt(ReplicationLogDiscovery.REPLICATION_IN_PROGRESS_FILE_MIN_AGE_SECONDS_KEY, 60); EnvironmentEdge edge = () -> currentTime; EnvironmentEdgeManager.injectEdge(edge); try { - // Create only recent files (all within the threshold) - long recentTimestamp1 = 1704153655000L; // 00:00:55 (5 seconds old) - long recentTimestamp2 = 1704153658000L; // 00:00:58 (2 seconds old) + // Create files with recent rename timestamps (within the 60-second min age window) + long recentRename1 = currentTime - 5000L; // renamed 5 seconds ago + long recentRename2 = currentTime - 2000L; // renamed 2 seconds ago - List recentFiles1 = createInProgressFiles(recentTimestamp1, 2); - List recentFiles2 = createInProgressFiles(recentTimestamp2, 2); + List recentFiles1 = createInProgressFiles(1704153600000L, 2, recentRename1); + List recentFiles2 = createInProgressFiles(1704153600000L, 2, recentRename2); // Process in-progress directory discovery.processInProgressDirectory(); @@ -1149,39 +1244,40 @@ public void testProcessInProgressDirectoryWithNoOldFiles() throws IOException { // Get processed files List processedFiles = discovery.getProcessedFiles(); - // Verify that no files were processed (all files are too recent) - assertEquals("Should not process any files when all files are too recent", 0, + // Verify that no files were processed (all rename timestamps are too recent) + assertEquals("Should not process any files when all files were recently renamed", 0, processedFiles.size()); } finally { EnvironmentEdgeManager.reset(); + conf.setInt(ReplicationLogDiscovery.REPLICATION_IN_PROGRESS_FILE_MIN_AGE_SECONDS_KEY, 0); } } /** - * Tests processing of in-progress directory with timestamp filtering using - * getOlderInProgressFiles. Validates that only files older than the calculated threshold are - * processed, excluding recent files. + * Tests processing of in-progress directory with rename timestamp filtering. Validates that only + * files whose rename timestamp is older than the configured minimum age are processed. */ @Test public void testProcessInProgressDirectoryWithTimestampFiltering() throws IOException { - // Set up current time for consistent testing + // Set up current time and override min age to 60 seconds for this test long currentTime = 1704153660000L; // 00:01:00 + conf.setInt(ReplicationLogDiscovery.REPLICATION_IN_PROGRESS_FILE_MIN_AGE_SECONDS_KEY, 60); EnvironmentEdge edge = () -> currentTime; EnvironmentEdgeManager.injectEdge(edge); try { - // Create files with various ages - long veryOldTimestamp = 1704153600000L; // 00:00:00 (1 minute old) - should be processed - long oldTimestamp = 1704153630000L; // 00:00:30 (30 seconds old) - should be processed - long recentTimestamp = 1704153655000L; // 00:00:55 (5 seconds old) - should NOT be processed - long veryRecentTimestamp = 1704153658000L; // 00:00:58 (2 seconds old) - should NOT be - // processed - - List veryOldFiles = createInProgressFiles(veryOldTimestamp, 1); - List oldFiles = createInProgressFiles(oldTimestamp, 1); - List recentFiles = createInProgressFiles(recentTimestamp, 1); - List veryRecentFiles = createInProgressFiles(veryRecentTimestamp, 1); + // threshold = currentTime - 60s = 1704153600000 + // Files renamed before threshold should be processed + long veryOldRename = currentTime - 120000L; // renamed 2 min ago - should be processed + long oldRename = currentTime - 70000L; // renamed 70s ago - should be processed + long recentRename = currentTime - 5000L; // renamed 5s ago - should NOT be processed + long veryRecentRename = currentTime - 2000L; // renamed 2s ago - should NOT be processed + + List veryOldFiles = createInProgressFiles(1704153500000L, 1, veryOldRename); + List oldFiles = createInProgressFiles(1704153500001L, 1, oldRename); + List recentFiles = createInProgressFiles(1704153500002L, 1, recentRename); + List veryRecentFiles = createInProgressFiles(1704153500003L, 1, veryRecentRename); // Process in-progress directory discovery.processInProgressDirectory(); @@ -1190,7 +1286,7 @@ public void testProcessInProgressDirectoryWithTimestampFiltering() throws IOExce List processedFiles = discovery.getProcessedFiles(); // Verify that only old files were processed (2 old files, 0 recent files) - assertEquals("Should process only old files based on timestamp filtering", 2, + assertEquals("Should process only old files based on rename timestamp filtering", 2, processedFiles.size()); // Create set of expected processed files (by prefix, since UUIDs are updated during @@ -1198,12 +1294,12 @@ public void testProcessInProgressDirectoryWithTimestampFiltering() throws IOExce Set expectedProcessedFilePrefixes = new HashSet<>(); for (Path file : veryOldFiles) { String fileName = file.getName(); - String prefix = fileName.substring(0, fileName.lastIndexOf("_")); + String prefix = extractPrefix(fileName); expectedProcessedFilePrefixes.add(prefix); } for (Path file : oldFiles) { String fileName = file.getName(); - String prefix = fileName.substring(0, fileName.lastIndexOf("_")); + String prefix = extractPrefix(fileName); expectedProcessedFilePrefixes.add(prefix); } @@ -1211,8 +1307,7 @@ public void testProcessInProgressDirectoryWithTimestampFiltering() throws IOExce Set actualProcessedFilePrefixes = new HashSet<>(); for (Path file : processedFiles) { String fileName = file.getName(); - String withoutExtension = fileName.substring(0, fileName.lastIndexOf(".")); - String prefix = withoutExtension.substring(0, withoutExtension.lastIndexOf("_")); + String prefix = extractPrefix(fileName); actualProcessedFilePrefixes.add(prefix); } @@ -1225,22 +1320,14 @@ public void testProcessInProgressDirectoryWithTimestampFiltering() throws IOExce // Verify that markInProgress was called for each expected file for (Path expectedFile : veryOldFiles) { - String expectedPrefix = - expectedFile.getName().substring(0, expectedFile.getName().lastIndexOf("_")); - Mockito.verify(fileTracker, Mockito.times(1)).markInProgress(Mockito.argThat(path -> { - String pathName = path.getName(); - String prefix = pathName.substring(0, pathName.lastIndexOf("_")); - return prefix.equals(expectedPrefix); - })); + String expectedPrefix = extractPrefix(expectedFile.getName()); + Mockito.verify(fileTracker, Mockito.times(1)).markInProgress( + Mockito.argThat(path -> extractPrefix(path.getName()).equals(expectedPrefix))); } for (Path expectedFile : oldFiles) { - String expectedPrefix = - expectedFile.getName().substring(0, expectedFile.getName().lastIndexOf("_")); - Mockito.verify(fileTracker, Mockito.times(1)).markInProgress(Mockito.argThat(path -> { - String pathName = path.getName(); - String prefix = pathName.substring(0, pathName.lastIndexOf("_")); - return prefix.equals(expectedPrefix); - })); + String expectedPrefix = extractPrefix(expectedFile.getName()); + Mockito.verify(fileTracker, Mockito.times(1)).markInProgress( + Mockito.argThat(path -> extractPrefix(path.getName()).equals(expectedPrefix))); } // Verify that markCompleted was called 2 times @@ -1248,42 +1335,31 @@ public void testProcessInProgressDirectoryWithTimestampFiltering() throws IOExce // Verify that markCompleted was called for each expected file with correct paths for (Path expectedFile : veryOldFiles) { - String expectedPrefix = - expectedFile.getName().substring(0, expectedFile.getName().lastIndexOf("_")); - Mockito.verify(fileTracker, Mockito.times(1)).markCompleted(Mockito.argThat(path -> { - String pathName = path.getName(); - String withoutExtension = pathName.substring(0, pathName.lastIndexOf(".")); - String prefix = withoutExtension.substring(0, withoutExtension.lastIndexOf("_")); - return prefix.equals(expectedPrefix); - })); + String expectedPrefix = extractPrefix(expectedFile.getName()); + Mockito.verify(fileTracker, Mockito.times(1)).markCompleted( + Mockito.argThat(path -> extractPrefix(path.getName()).equals(expectedPrefix))); } for (Path expectedFile : oldFiles) { - String expectedPrefix = - expectedFile.getName().substring(0, expectedFile.getName().lastIndexOf("_")); - Mockito.verify(fileTracker, Mockito.times(1)).markCompleted(Mockito.argThat(path -> { - String pathName = path.getName(); - String withoutExtension = pathName.substring(0, pathName.lastIndexOf(".")); - String prefix = withoutExtension.substring(0, withoutExtension.lastIndexOf("_")); - return prefix.equals(expectedPrefix); - })); + String expectedPrefix = extractPrefix(expectedFile.getName()); + Mockito.verify(fileTracker, Mockito.times(1)).markCompleted( + Mockito.argThat(path -> extractPrefix(path.getName()).equals(expectedPrefix))); } // Verify that recent files were NOT processed for (Path file : recentFiles) { - String unexpectedPrefix = file.getName().substring(0, file.getName().lastIndexOf("_")); - assertFalse( - "Recent files should not be processed due to timestamp filtering: " + file.getName(), - actualProcessedFilePrefixes.contains(unexpectedPrefix)); + String unexpectedPrefix = extractPrefix(file.getName()); + assertFalse("Recent files should not be processed due to rename timestamp filtering: " + + file.getName(), actualProcessedFilePrefixes.contains(unexpectedPrefix)); } for (Path file : veryRecentFiles) { - String unexpectedPrefix = file.getName().substring(0, file.getName().lastIndexOf("_")); - assertFalse( - "Recent files should not be processed due to timestamp filtering: " + file.getName(), - actualProcessedFilePrefixes.contains(unexpectedPrefix)); + String unexpectedPrefix = extractPrefix(file.getName()); + assertFalse("Recent files should not be processed due to rename timestamp filtering: " + + file.getName(), actualProcessedFilePrefixes.contains(unexpectedPrefix)); } } finally { EnvironmentEdgeManager.reset(); + conf.setInt(ReplicationLogDiscovery.REPLICATION_IN_PROGRESS_FILE_MIN_AGE_SECONDS_KEY, 0); } } @@ -1775,14 +1851,32 @@ private List createNewFilesForRound(ReplicationRound replicationRound, int return newFiles; } + /** + * Extracts the stable prefix (_) from a filename. Works for all formats: + * "ts_server.plog", "ts_server_UUID_renameTs.plog" + */ + private static String extractPrefix(String fileName) { + String[] parts = fileName.split("_"); + if (parts.length < 2) { + return fileName.split("\\.")[0]; + } + String secondPart = parts[1].contains(".") ? parts[1].split("\\.")[0] : parts[1]; + return parts[0] + "_" + secondPart; + } + private List createInProgressFiles(long timestamp, int count) throws IOException { - // Create in-progress files + return createInProgressFiles(timestamp, count, timestamp); + } + + private List createInProgressFiles(long timestamp, int count, long renameTimestamp) + throws IOException { Path inProgressDir = fileTracker.getInProgressDirPath(); localFs.mkdirs(inProgressDir); List inProgressFiles = new ArrayList<>(); for (int i = 0; i < count; i++) { String uuid = "12345678-1234-1234-1234-123456789abc" + i; - Path inProgressFile = new Path(inProgressDir, timestamp + "_rs-" + i + "_" + uuid + ".plog"); + Path inProgressFile = new Path(inProgressDir, + timestamp + "_rs-" + i + "_" + uuid + "_" + renameTimestamp + ".plog"); localFs.create(inProgressFile, true).close(); inProgressFiles.add(inProgressFile); } diff --git a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogTrackerTest.java b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogTrackerTest.java index 2037b25c0ee..6d6d8163a28 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogTrackerTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogTrackerTest.java @@ -38,6 +38,8 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.util.EnvironmentEdge; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.phoenix.replication.metrics.MetricsReplicationLogTracker; import org.apache.phoenix.replication.metrics.MetricsReplicationLogTrackerReplayImpl; import org.apache.phoenix.replication.reader.ReplicationLogReplay; @@ -631,58 +633,76 @@ public void testMarkInProgressForNewFile() throws IOException { // Initialize tracker tracker.init(); - // Create a file in a shard directory (without UUID) - ReplicationShardDirectoryManager shardManager = tracker.getReplicationShardDirectoryManager(); - List allShardPaths = shardManager.getAllShardPaths(); - Path shardPath = allShardPaths.get(0); - localFs.mkdirs(shardPath); - - // Create original file without UUID - Path originalFile = new Path(shardPath, "1704153600000_rs1.plog"); - localFs.create(originalFile, true).close(); + // Inject a fixed time so we can verify the exact rename timestamp + long fixedTime = 1704153700000L; + EnvironmentEdge edge = () -> fixedTime; + EnvironmentEdgeManager.injectEdge(edge); - // Verify original file exists - assertTrue("Original file should exist", localFs.exists(originalFile)); - - // Call markInProgress - Optional result = tracker.markInProgress(originalFile); - - // Verify file system operation counts - // markInProgress involves moving file from shard directory to in-progress directory - // It should call exists() for only in progress directory (during init), rename() to move file - Mockito.verify(mockFs, times(1)).exists(Mockito.any(Path.class)); - Mockito.verify(mockFs, times(1)).exists(Mockito.eq(tracker.getInProgressDirPath())); - Mockito.verify(mockFs, times(1)).rename(Mockito.any(Path.class), Mockito.any(Path.class)); - Mockito.verify(mockFs, times(1)).rename(Mockito.eq(originalFile), Mockito.any(Path.class)); - // Ensure no listStatus() is called - Mockito.verify(mockFs, times(0)).listStatus(Mockito.any(Path.class)); - - // Verify operation was successful - assertTrue("markInProgress should be successful", result.isPresent()); - - // Verify original file no longer exists - assertFalse("Original file should no longer exist", localFs.exists(originalFile)); + try { + // Create a file in a shard directory (without UUID) + ReplicationShardDirectoryManager shardManager = tracker.getReplicationShardDirectoryManager(); + List allShardPaths = shardManager.getAllShardPaths(); + Path shardPath = allShardPaths.get(0); + localFs.mkdirs(shardPath); - // Verify file was moved to in-progress directory with UUID - Path inProgressDir = tracker.getInProgressDirPath(); - FileStatus[] files = localFs.listStatus(inProgressDir); - assertEquals("Should have exactly one file in in-progress directory", 1, files.length); - - // Verify the new file has UUID format and is in in-progress directory - String newFileName = files[0].getPath().getName(); - assertTrue("New file should have UUID suffix", newFileName.matches( - "1704153600000_rs1_[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}\\.plog")); - - // Assert that renamed file is in in-progress directory - Path renamedFile = files[0].getPath(); - assertTrue("Renamed file should be in in-progress directory", - renamedFile.getParent().toUri().getPath().equals(tracker.getInProgressDirPath().toString())); - - // Assert that renamed file has same prefix as original file - String originalFileName = originalFile.getName(); - String originalPrefix = originalFileName.substring(0, originalFileName.lastIndexOf('.')); - assertTrue("Renamed file should have same prefix as original file", - newFileName.startsWith(originalPrefix + "_")); + // Create original file without UUID + Path originalFile = new Path(shardPath, "1704153600000_rs1.plog"); + localFs.create(originalFile, true).close(); + + // Verify original file exists + assertTrue("Original file should exist", localFs.exists(originalFile)); + + // Call markInProgress + Optional result = tracker.markInProgress(originalFile); + + // Verify file system operation counts + // markInProgress involves moving file from shard directory to in-progress directory + // It should call exists() for only in progress directory (during init), rename() to move file + Mockito.verify(mockFs, times(1)).exists(Mockito.any(Path.class)); + Mockito.verify(mockFs, times(1)).exists(Mockito.eq(tracker.getInProgressDirPath())); + Mockito.verify(mockFs, times(1)).rename(Mockito.any(Path.class), Mockito.any(Path.class)); + Mockito.verify(mockFs, times(1)).rename(Mockito.eq(originalFile), Mockito.any(Path.class)); + // Ensure no listStatus() is called + Mockito.verify(mockFs, times(0)).listStatus(Mockito.any(Path.class)); + + // Verify operation was successful + assertTrue("markInProgress should be successful", result.isPresent()); + + // Verify original file no longer exists + assertFalse("Original file should no longer exist", localFs.exists(originalFile)); + + // Verify file was moved to in-progress directory with UUID + Path inProgressDir = tracker.getInProgressDirPath(); + FileStatus[] files = localFs.listStatus(inProgressDir); + assertEquals("Should have exactly one file in in-progress directory", 1, files.length); + + // Verify the new file has UUID and rename timestamp format: + // ___.plog + String newFileName = files[0].getPath().getName(); + assertTrue("New file should have UUID and rename timestamp", + newFileName + .matches("1704153600000_rs1_[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}" + + "_[0-9]+\\.plog")); + + // Assert that renamed file is in in-progress directory + Path renamedFile = files[0].getPath(); + assertTrue("Renamed file should be in in-progress directory", renamedFile.getParent().toUri() + .getPath().equals(tracker.getInProgressDirPath().toString())); + + // Assert that renamed file has same prefix as original file + String originalFileName = originalFile.getName(); + String originalPrefix = originalFileName.substring(0, originalFileName.lastIndexOf('.')); + assertTrue("Renamed file should have same prefix as original file", + newFileName.startsWith(originalPrefix + "_")); + + // Verify rename timestamp matches the injected current time exactly + Optional renameTimestamp = tracker.getRenameTimestamp(renamedFile); + assertTrue("Rename timestamp should be present", renameTimestamp.isPresent()); + assertEquals("Rename timestamp should match the current time", fixedTime, + renameTimestamp.get().longValue()); + } finally { + EnvironmentEdgeManager.reset(); + } } @Test @@ -690,49 +710,70 @@ public void testMarkInProgressForAlreadyInProgressFile() throws IOException { // Initialize tracker tracker.init(); - // Create a file in in-progress directory with existing UUID - Path inProgressDir = tracker.getInProgressDirPath(); - String existingUUID = "12345678-1234-1234-1234-123456789abc"; - Path originalFile = new Path(inProgressDir, "1704153600000_rs1_" + existingUUID + ".plog"); - localFs.create(originalFile, true).close(); - - // Verify original file exists - assertTrue("Original file should exist", localFs.exists(originalFile)); - - // Call markInProgress - Optional result = tracker.markInProgress(originalFile); - - // Verify file system operation counts - // markInProgress involves re-naming file int the in-progress directory - // It should call exists() for only in progress directory (during init), rename() to move file - Mockito.verify(mockFs, times(1)).exists(Mockito.any(Path.class)); - Mockito.verify(mockFs, times(1)).exists(Mockito.eq(tracker.getInProgressDirPath())); - Mockito.verify(mockFs, times(1)).rename(Mockito.any(Path.class), Mockito.any(Path.class)); - Mockito.verify(mockFs, times(1)).rename(Mockito.eq(originalFile), Mockito.any(Path.class)); - // Ensure no listStatus() is called - Mockito.verify(mockFs, times(0)).listStatus(Mockito.any(Path.class)); - - // Verify operation was successful - assertTrue("markInProgress should be successful", result.isPresent()); - - // Verify original file no longer exists - assertFalse("Original file should no longer exist", localFs.exists(originalFile)); - - // Verify new file exists in same directory with new UUID - FileStatus[] files = localFs.listStatus(inProgressDir); - assertEquals("Should have exactly one file in in-progress directory", 1, files.length); - - // Verify the new file has different UUID - String newFileName = files[0].getPath().getName(); - assertTrue("New file should have UUID suffix", newFileName.matches( - "1704153600000_rs1_[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}\\.plog")); - assertFalse("New file should have different UUID", newFileName.contains(existingUUID)); - - // Assert that renamed file has same prefix as original file - String originalFileName = originalFile.getName(); - String originalPrefix = originalFileName.substring(0, originalFileName.lastIndexOf('_')); - assertTrue("Renamed file should have same prefix as original file", - newFileName.startsWith(originalPrefix + "_")); + // Inject a fixed time that is newer than the existing rename timestamp + long existingRenameTimestamp = 1704153660000L; + long fixedTime = 1704153800000L; + EnvironmentEdge edge = () -> fixedTime; + EnvironmentEdgeManager.injectEdge(edge); + + try { + // Create a file in in-progress directory with existing UUID and rename timestamp + Path inProgressDir = tracker.getInProgressDirPath(); + String existingUUID = "12345678-1234-1234-1234-123456789abc"; + Path originalFile = new Path(inProgressDir, + "1704153600000_rs1_" + existingUUID + "_" + existingRenameTimestamp + ".plog"); + localFs.create(originalFile, true).close(); + + // Verify original file exists + assertTrue("Original file should exist", localFs.exists(originalFile)); + + // Call markInProgress + Optional result = tracker.markInProgress(originalFile); + + // Verify file system operation counts + // markInProgress involves re-naming file in the in-progress directory + // It should call exists() for only in progress directory (during init), rename() to move file + Mockito.verify(mockFs, times(1)).exists(Mockito.any(Path.class)); + Mockito.verify(mockFs, times(1)).exists(Mockito.eq(tracker.getInProgressDirPath())); + Mockito.verify(mockFs, times(1)).rename(Mockito.any(Path.class), Mockito.any(Path.class)); + Mockito.verify(mockFs, times(1)).rename(Mockito.eq(originalFile), Mockito.any(Path.class)); + // Ensure no listStatus() is called + Mockito.verify(mockFs, times(0)).listStatus(Mockito.any(Path.class)); + + // Verify operation was successful + assertTrue("markInProgress should be successful", result.isPresent()); + + // Verify original file no longer exists + assertFalse("Original file should no longer exist", localFs.exists(originalFile)); + + // Verify new file exists in same directory with new UUID and new rename timestamp + FileStatus[] files = localFs.listStatus(inProgressDir); + assertEquals("Should have exactly one file in in-progress directory", 1, files.length); + + // Verify the new file has different UUID and a rename timestamp: + // ___.plog + String newFileName = files[0].getPath().getName(); + assertTrue("New file should have UUID and rename timestamp", + newFileName + .matches("1704153600000_rs1_[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}" + + "_[0-9]+\\.plog")); + assertFalse("New file should have different UUID", newFileName.contains(existingUUID)); + + // Assert that renamed file has same prefix (ts_server) as original file + String expectedPrefix = "1704153600000_rs1"; + assertTrue("Renamed file should have same prefix as original file", + newFileName.startsWith(expectedPrefix + "_")); + + // Verify rename timestamp is updated to the current (injected) time, not the old one + Optional renameTimestamp = tracker.getRenameTimestamp(files[0].getPath()); + assertTrue("Rename timestamp should be present", renameTimestamp.isPresent()); + assertEquals("Rename timestamp should match the current time (not the old value)", fixedTime, + renameTimestamp.get().longValue()); + assertTrue("Rename timestamp should be newer than the original", + renameTimestamp.get() > existingRenameTimestamp); + } finally { + EnvironmentEdgeManager.reset(); + } } @Test @@ -1090,9 +1131,9 @@ public void testGetFileUUID() throws IOException { Optional newFileUUID = tracker.getFileUUID(newFile); assertFalse("New file without UUID should return empty Optional", newFileUUID.isPresent()); - // 2. For in-progress file - with UUID + // 2. For in-progress file - with UUID and rename timestamp Path inProgressFile = new Path(tracker.getInProgressDirPath(), - "1704153600000_rs1_12345678-1234-1234-1234-123456789abc.plog"); + "1704153600000_rs1_12345678-1234-1234-1234-123456789abc_1704153660000.plog"); Optional inProgressFileUUID = tracker.getFileUUID(inProgressFile); assertTrue("In-progress file with UUID should return present Optional", inProgressFileUUID.isPresent()); @@ -1101,12 +1142,19 @@ public void testGetFileUUID() throws IOException { // Test with different UUID Path anotherInProgressFile = new Path(tracker.getInProgressDirPath(), - "1704153600000_rs1_87654321-4321-4321-4321-cba987654321.plog"); + "1704153600000_rs1_87654321-4321-4321-4321-cba987654321_1704153660000.plog"); Optional anotherUUID = tracker.getFileUUID(anotherInProgressFile); assertTrue("Another in-progress file with UUID should return present Optional", anotherUUID.isPresent()); assertEquals("Another in-progress file UUID should be extracted correctly", "87654321-4321-4321-4321-cba987654321", anotherUUID.get()); + + // 3. Old format without rename timestamp (3 parts) should return empty + Path oldFormatFile = new Path(tracker.getInProgressDirPath(), + "1704153600000_rs1_12345678-1234-1234-1234-123456789abc.plog"); + Optional oldFormatUUID = tracker.getFileUUID(oldFormatFile); + assertFalse("Old format file without rename timestamp should return empty", + oldFormatUUID.isPresent()); } @Test @@ -1117,31 +1165,36 @@ public void testGetOlderInProgressFiles() throws IOException { // Get the in-progress directory path Path inProgressDir = tracker.getInProgressDirPath(); - // Create files with different timestamps - long baseTimestamp = 1704153600000L; // 2024-01-02 00:00:00 - long thresholdTimestamp = baseTimestamp + TimeUnit.HOURS.toMillis(1); // 1 hour later + // Create files with different rename timestamps + long baseRenameTimestamp = 1704153600000L; // 2024-01-02 00:00:00 + long thresholdTimestamp = baseRenameTimestamp + TimeUnit.HOURS.toMillis(1); // 1 hour later + String uuid = "12345678-1234-1234-1234-123456789abc"; - // Files older than threshold (should be returned) + // Files with rename timestamps older than threshold (should be returned) + long oldRename1 = baseRenameTimestamp + TimeUnit.MINUTES.toMillis(30); + long oldRename2 = baseRenameTimestamp + TimeUnit.MINUTES.toMillis(45); Path oldFile1 = - new Path(inProgressDir, (baseTimestamp + TimeUnit.MINUTES.toMillis(30)) + "_rs1.plog"); + new Path(inProgressDir, "1704153600000_rs1_" + uuid + "_" + oldRename1 + ".plog"); Path oldFile2 = - new Path(inProgressDir, (baseTimestamp + TimeUnit.MINUTES.toMillis(45)) + "_rs2.plog"); + new Path(inProgressDir, "1704153600000_rs2_" + uuid + "_" + oldRename2 + ".plog"); - // Files newer than threshold (should not be returned) + // Files with rename timestamps newer than threshold (should not be returned) + long newRename1 = baseRenameTimestamp + TimeUnit.HOURS.toMillis(2); + long newRename2 = baseRenameTimestamp + TimeUnit.HOURS.toMillis(3); Path newFile1 = - new Path(inProgressDir, (baseTimestamp + TimeUnit.HOURS.toMillis(2)) + "_rs3.plog"); + new Path(inProgressDir, "1704153600000_rs3_" + uuid + "_" + newRename1 + ".plog"); Path newFile2 = - new Path(inProgressDir, (baseTimestamp + TimeUnit.HOURS.toMillis(3)) + "_rs4.plog"); + new Path(inProgressDir, "1704153600000_rs4_" + uuid + "_" + newRename2 + ".plog"); - // Invalid files (should be skipped) - Path invalidFile = new Path(inProgressDir, "invalid_timestamp_rs5.plog"); + // Files without rename timestamp (should be skipped) + Path noRenameFile = new Path(inProgressDir, "1704153600000_rs5_" + uuid + ".plog"); // Create all files localFs.create(oldFile1, true).close(); localFs.create(oldFile2, true).close(); localFs.create(newFile1, true).close(); localFs.create(newFile2, true).close(); - localFs.create(invalidFile, true).close(); + localFs.create(noRenameFile, true).close(); // Call getOlderInProgressFiles List result = tracker.getOlderInProgressFiles(thresholdTimestamp); @@ -1159,7 +1212,8 @@ public void testGetOlderInProgressFiles() throws IOException { assertTrue("Should contain oldFile2", resultFilenames.contains(oldFile2.getName())); assertFalse("Should not contain newFile1", resultFilenames.contains(newFile1.getName())); assertFalse("Should not contain newFile2", resultFilenames.contains(newFile2.getName())); - assertFalse("Should not contain invalidFile", resultFilenames.contains(invalidFile.getName())); + assertFalse("Should not contain noRenameFile", + resultFilenames.contains(noRenameFile.getName())); } @Test @@ -1169,15 +1223,18 @@ public void testGetOlderInProgressFilesWithNoOldFiles() throws IOException { // Get the in-progress directory path Path inProgressDir = tracker.getInProgressDirPath(); + String uuid = "12345678-1234-1234-1234-123456789abc"; - // Create files all newer than threshold + // Create files with rename timestamps all newer than threshold long baseTimestamp = 1704153600000L; long thresholdTimestamp = baseTimestamp + TimeUnit.HOURS.toMillis(1); + long newRename1 = baseTimestamp + TimeUnit.HOURS.toMillis(2); + long newRename2 = baseTimestamp + TimeUnit.HOURS.toMillis(3); Path newFile1 = - new Path(inProgressDir, (baseTimestamp + TimeUnit.HOURS.toMillis(2)) + "_rs1.plog"); + new Path(inProgressDir, "1704153600000_rs1_" + uuid + "_" + newRename1 + ".plog"); Path newFile2 = - new Path(inProgressDir, (baseTimestamp + TimeUnit.HOURS.toMillis(3)) + "_rs2.plog"); + new Path(inProgressDir, "1704153600000_rs2_" + uuid + "_" + newRename2 + ".plog"); localFs.create(newFile1, true).close(); localFs.create(newFile2, true).close(); @@ -1236,22 +1293,27 @@ public void testGetOlderInProgressFilesWithInvalidFiles() throws IOException { // Get the in-progress directory path Path inProgressDir = tracker.getInProgressDirPath(); + String uuid = "12345678-1234-1234-1234-123456789abc"; long baseTimestamp = 1704153600000L; long thresholdTimestamp = baseTimestamp + TimeUnit.HOURS.toMillis(1); - // Valid old file (should be returned) + // Valid old file with rename timestamp (should be returned) + long oldRename = baseTimestamp + TimeUnit.MINUTES.toMillis(30); Path validOldFile = - new Path(inProgressDir, (baseTimestamp + TimeUnit.MINUTES.toMillis(30)) + "_rs1.plog"); + new Path(inProgressDir, "1704153600000_rs1_" + uuid + "_" + oldRename + ".plog"); - // Invalid files (should be skipped) - Path invalidFile1 = new Path(inProgressDir, "invalid_timestamp_rs2.plog"); - Path invalidFile2 = new Path(inProgressDir, "not_a_timestamp_rs3.plog"); - Path invalidFile3 = new Path(inProgressDir, "1704153600000_rs4.txt"); // wrong extension + // Files without rename timestamp (should be skipped - only 3 parts) + Path noRenameFile = new Path(inProgressDir, "1704153600000_rs2_" + uuid + ".plog"); + // File with non-numeric rename timestamp (should be skipped) + Path invalidRenameFile = + new Path(inProgressDir, "1704153600000_rs3_" + uuid + "_notanumber.plog"); + // Wrong extension (should be skipped by isValidLogFile) + Path invalidFile3 = new Path(inProgressDir, "1704153600000_rs4.txt"); localFs.create(validOldFile, true).close(); - localFs.create(invalidFile1, true).close(); - localFs.create(invalidFile2, true).close(); + localFs.create(noRenameFile, true).close(); + localFs.create(invalidRenameFile, true).close(); localFs.create(invalidFile3, true).close(); // Call getOlderInProgressFiles @@ -1264,10 +1326,10 @@ public void testGetOlderInProgressFilesWithInvalidFiles() throws IOException { Set resultFilenames = result.stream().map(Path::getName).collect(Collectors.toSet()); assertTrue("Should contain validOldFile", resultFilenames.contains(validOldFile.getName())); - assertFalse("Should not contain invalidFile1", - resultFilenames.contains(invalidFile1.getName())); - assertFalse("Should not contain invalidFile2", - resultFilenames.contains(invalidFile2.getName())); + assertFalse("Should not contain noRenameFile", + resultFilenames.contains(noRenameFile.getName())); + assertFalse("Should not contain invalidRenameFile", + resultFilenames.contains(invalidRenameFile.getName())); assertFalse("Should not contain invalidFile3", resultFilenames.contains(invalidFile3.getName())); } @@ -1279,16 +1341,18 @@ public void testGetOlderInProgressFilesWithExactThreshold() throws IOException { // Get the in-progress directory path Path inProgressDir = tracker.getInProgressDirPath(); + String uuid = "12345678-1234-1234-1234-123456789abc"; long baseTimestamp = 1704153600000L; long thresholdTimestamp = baseTimestamp + TimeUnit.HOURS.toMillis(1); - // File with timestamp exactly at threshold (should NOT be returned - we want older than - // threshold) - Path fileAtThreshold = new Path(inProgressDir, thresholdTimestamp + "_rs1.plog"); + // File with rename timestamp exactly at threshold (should NOT be returned) + Path fileAtThreshold = + new Path(inProgressDir, "1704153600000_rs1_" + uuid + "_" + thresholdTimestamp + ".plog"); - // File with timestamp just before threshold (should be returned) - Path fileJustBeforeThreshold = new Path(inProgressDir, (thresholdTimestamp - 1) + "_rs2.plog"); + // File with rename timestamp just before threshold (should be returned) + Path fileJustBeforeThreshold = new Path(inProgressDir, + "1704153600000_rs2_" + uuid + "_" + (thresholdTimestamp - 1) + ".plog"); localFs.create(fileAtThreshold, true).close(); localFs.create(fileJustBeforeThreshold, true).close(); @@ -1315,30 +1379,36 @@ public void testGetOlderInProgressFilesWithMixedFileTypes() throws IOException { // Get the in-progress directory path Path inProgressDir = tracker.getInProgressDirPath(); + String uuid = "12345678-1234-1234-1234-123456789abc"; long baseTimestamp = 1704153600000L; long thresholdTimestamp = baseTimestamp + TimeUnit.HOURS.toMillis(1); - // Valid old files (should be returned) + // Valid old files with rename timestamps before threshold (should be returned) + long oldRename1 = baseTimestamp + TimeUnit.MINUTES.toMillis(30); + long oldRename2 = baseTimestamp + TimeUnit.MINUTES.toMillis(45); Path oldFile1 = - new Path(inProgressDir, (baseTimestamp + TimeUnit.MINUTES.toMillis(30)) + "_rs1.plog"); + new Path(inProgressDir, "1704153600000_rs1_" + uuid + "_" + oldRename1 + ".plog"); Path oldFile2 = - new Path(inProgressDir, (baseTimestamp + TimeUnit.MINUTES.toMillis(45)) + "_rs2.plog"); + new Path(inProgressDir, "1704153600000_rs2_" + uuid + "_" + oldRename2 + ".plog"); - // Valid new files (should not be returned) + // Valid file with rename timestamp after threshold (should not be returned) + long newRename1 = baseTimestamp + TimeUnit.HOURS.toMillis(2); Path newFile1 = - new Path(inProgressDir, (baseTimestamp + TimeUnit.HOURS.toMillis(2)) + "_rs3.plog"); + new Path(inProgressDir, "1704153600000_rs3_" + uuid + "_" + newRename1 + ".plog"); - // Invalid files (should be skipped) - Path invalidFile1 = new Path(inProgressDir, "invalid_timestamp_rs4.plog"); - Path invalidFile2 = new Path(inProgressDir, "1704153600000_rs5.txt"); // wrong extension - Path invalidFile3 = new Path(inProgressDir, "not_a_number_rs6.plog"); + // Files without rename timestamp (should be skipped - only 3 parts) + Path noRenameFile = new Path(inProgressDir, "1704153600000_rs4_" + uuid + ".plog"); + // Wrong extension (should be skipped by isValidLogFile) + Path invalidFile2 = new Path(inProgressDir, "1704153600000_rs5.txt"); + // Non-numeric rename timestamp (should be skipped) + Path invalidFile3 = new Path(inProgressDir, "1704153600000_rs6_" + uuid + "_notanumber.plog"); // Create all files localFs.create(oldFile1, true).close(); localFs.create(oldFile2, true).close(); localFs.create(newFile1, true).close(); - localFs.create(invalidFile1, true).close(); + localFs.create(noRenameFile, true).close(); localFs.create(invalidFile2, true).close(); localFs.create(invalidFile3, true).close(); @@ -1354,8 +1424,8 @@ public void testGetOlderInProgressFilesWithMixedFileTypes() throws IOException { assertTrue("Should contain oldFile1", resultFilenames.contains(oldFile1.getName())); assertTrue("Should contain oldFile2", resultFilenames.contains(oldFile2.getName())); assertFalse("Should not contain newFile1", resultFilenames.contains(newFile1.getName())); - assertFalse("Should not contain invalidFile1", - resultFilenames.contains(invalidFile1.getName())); + assertFalse("Should not contain noRenameFile", + resultFilenames.contains(noRenameFile.getName())); assertFalse("Should not contain invalidFile2", resultFilenames.contains(invalidFile2.getName())); assertFalse("Should not contain invalidFile3",