Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
package org.apache.phoenix.replication;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
Expand Down Expand Up @@ -83,6 +85,25 @@ public abstract class ReplicationLogDiscovery {
*/
protected static final double DEFAULT_WAITING_BUFFER_PERCENTAGE = 15.0;

/**
* Configuration key for maximum number of retries per in-progress file within a single processing
* round. Files that fail this many times are skipped for the rest of the round.
*/
public static final String REPLICATION_IN_PROGRESS_FILE_MAX_RETRIES_KEY =
"phoenix.replication.in.progress.file.max.retries";

public static final int DEFAULT_IN_PROGRESS_FILE_MAX_RETRIES = 1;

/**
* Configuration key for the minimum age (in seconds) of an in-progress file's rename timestamp
* before it becomes eligible for processing. This prevents a file recently marked in-progress by
* one region server from being immediately picked up by another.
*/
public static final String REPLICATION_IN_PROGRESS_FILE_MIN_AGE_SECONDS_KEY =
"phoenix.replication.in.progress.file.min.age.seconds";

public static final int DEFAULT_IN_PROGRESS_FILE_MIN_AGE_SECONDS = 60;

protected final Configuration conf;
protected final String haGroupName;
protected final ReplicationLogTracker replicationLogTracker;
Expand Down Expand Up @@ -283,8 +304,9 @@ protected void processNewFilesForRound(ReplicationRound replicationRound) throws
}

/**
* Processes all files (older than 1 round time) in the in-progress directory. Continuously
* processes files until no in-progress files remain.
* Processes all files in the in-progress directory whose rename timestamp is older than the
* configured minimum age. Continuously processes files until no eligible in-progress files
* remain.
* @throws IOException if there's an error during file processing
*/
protected void processInProgressDirectory() throws IOException {
Expand All @@ -293,16 +315,31 @@ protected void processInProgressDirectory() throws IOException {
// Increase the count for number of times in progress directory is processed
getMetrics().incrementNumInProgressDirectoryProcessed();
long startTime = EnvironmentEdgeManager.currentTime();
long oldestTimestampToProcess =
replicationLogTracker.getReplicationShardDirectoryManager().getNearestRoundStartTimestamp(
EnvironmentEdgeManager.currentTime()) - getReplayIntervalSeconds() * 1000L;
List<Path> files = replicationLogTracker.getOlderInProgressFiles(oldestTimestampToProcess);
LOG.info("Number of {} files with oldestTimestampToProcess {} is {} for haGroup: {}",
replicationLogTracker.getInProgressLogSubDirectoryName(), oldestTimestampToProcess,
long renameTimestampThreshold =
EnvironmentEdgeManager.currentTime() - getInProgressFileMinAgeSeconds() * 1000L;
int maxRetries = getInProgressFileMaxRetries();
Map<String, Integer> failureCount = new HashMap<>();
List<Path> files = replicationLogTracker.getOlderInProgressFiles(renameTimestampThreshold);
LOG.info("Number of {} files with renameTimestampThreshold {} is {} for haGroup: {}",
replicationLogTracker.getInProgressLogSubDirectoryName(), renameTimestampThreshold,
files.size(), haGroupName);
while (!files.isEmpty()) {
processOneRandomFile(files);
files = replicationLogTracker.getOlderInProgressFiles(oldestTimestampToProcess);
Optional<Path> failedFile = processOneRandomFile(files);
if (failedFile.isPresent()) {
String prefix = replicationLogTracker.getFilePrefix(failedFile.get());
int count = failureCount.merge(prefix, 1, Integer::sum);
if (count >= maxRetries) {
LOG.warn(
"File {} (prefix: {}) has failed {} time(s), reached max retries ({}). "
+ "Skipping for the rest of this round for haGroup: {}",
failedFile.get(), prefix, count, maxRetries, haGroupName);
}
}
renameTimestampThreshold =
EnvironmentEdgeManager.currentTime() - getInProgressFileMinAgeSeconds() * 1000L;
files = replicationLogTracker.getOlderInProgressFiles(renameTimestampThreshold);
files.removeIf(
f -> failureCount.getOrDefault(replicationLogTracker.getFilePrefix(f), 0) >= maxRetries);
}
long duration = EnvironmentEdgeManager.currentTime() - startTime;
LOG.info("Finished in-progress files processing in {}ms for haGroup: {}", duration,
Expand All @@ -314,8 +351,9 @@ protected void processInProgressDirectory() throws IOException {
* Processes a single random file from the provided list. Marks the file as in-progress, processes
* it, and marks it as completed or failed.
* @param files - List of files from which to select and process one randomly
* @return the original path of the file that failed, or empty if processing succeeded
*/
private void processOneRandomFile(final List<Path> files) throws IOException {
private Optional<Path> processOneRandomFile(final List<Path> files) throws IOException {
// Pick a random file and process it
Path file = files.get(ThreadLocalRandom.current().nextInt(files.size()));
Optional<Path> optionalInProgressFilePath = Optional.empty();
Expand All @@ -329,9 +367,9 @@ private void processOneRandomFile(final List<Path> files) throws IOException {
LOG.error("Failed to process the file {}", file, exception);
optionalInProgressFilePath.ifPresent(replicationLogTracker::markFailed);
// Not throwing this exception because next time another random file will be retried.
// If it's persistent failure for in_progress directory,
// cluster state should to be DEGRADED_STANDBY_FOR_READER.
return Optional.of(file);
}
return Optional.empty();
}

/**
Expand Down Expand Up @@ -469,6 +507,16 @@ public double getWaitingBufferPercentage() {
return DEFAULT_WAITING_BUFFER_PERCENTAGE;
}

public int getInProgressFileMaxRetries() {
return conf.getInt(REPLICATION_IN_PROGRESS_FILE_MAX_RETRIES_KEY,
DEFAULT_IN_PROGRESS_FILE_MAX_RETRIES);
}

public int getInProgressFileMinAgeSeconds() {
return conf.getInt(REPLICATION_IN_PROGRESS_FILE_MIN_AGE_SECONDS_KEY,
DEFAULT_IN_PROGRESS_FILE_MIN_AGE_SECONDS);
}

public ReplicationLogTracker getReplicationLogFileTracker() {
return this.replicationLogTracker;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,15 +203,15 @@ public List<Path> getInProgressFiles() throws IOException {
}

/**
* Retrieves all valid log files in the in-progress directory that are older than the specified
* timestamp.
* @param timestampThreshold - The timestamp threshold in milliseconds. Files with timestamps less
* than this value will be returned.
* @return List of valid log file paths in the in-progress directory that are older than the
* Retrieves all valid log files in the in-progress directory whose rename timestamp is older than
* the specified threshold. Files without a rename timestamp are skipped.
* @param renameTimestampThreshold - The timestamp threshold in milliseconds. Files with rename
* timestamps less than this value will be returned.
* @return List of valid log file paths in the in-progress directory that were renamed before the
* threshold, empty list if directory doesn't exist or no files match
* @throws IOException if there's an error accessing the file system
*/
public List<Path> getOlderInProgressFiles(long timestampThreshold) throws IOException {
public List<Path> getOlderInProgressFiles(long renameTimestampThreshold) throws IOException {
if (!fileSystem.exists(getInProgressDirPath())) {
return Collections.emptyList();
}
Expand All @@ -221,20 +221,19 @@ public List<Path> getOlderInProgressFiles(long timestampThreshold) throws IOExce

for (FileStatus status : fileStatuses) {
if (status.isFile() && isValidLogFile(status.getPath())) {
try {
long fileTimestamp = getFileTimestamp(status.getPath());
if (fileTimestamp < timestampThreshold) {
olderInProgressFiles.add(status.getPath());
}
} catch (NumberFormatException e) {
LOG.warn("Failed to extract timestamp from file {}, skipping",
status.getPath().getName());
Optional<Long> renameTimestamp = getRenameTimestamp(status.getPath());
if (!renameTimestamp.isPresent()) {
LOG.warn("File {} has no rename timestamp, skipping", status.getPath().getName());
continue;
}
if (renameTimestamp.get() < renameTimestampThreshold) {
olderInProgressFiles.add(status.getPath());
}
}
}

LOG.debug("Found {} in-progress files older than timestamp {}", olderInProgressFiles.size(),
timestampThreshold);
LOG.debug("Found {} in-progress files renamed before timestamp {}", olderInProgressFiles.size(),
renameTimestampThreshold);
return olderInProgressFiles;
}

Expand Down Expand Up @@ -364,26 +363,20 @@ protected Optional<Path> markInProgress(final Path file) {

// Check if file is already in in-progress directory
if (file.getParent().toUri().getPath().equals(getInProgressDirPath().toString())) {
// File is already in in-progress directory, replace UUID with a new one
// File is already in in-progress directory, replace UUID and rename timestamp
// keep the directory same as in progress
// Format: <ts>_<server>_<UUID>_<renameTs>.plog → extract prefix (first 2 parts)
String[] parts = fileName.split("_");
// Remove the last part (UUID) and add new UUID
StringBuilder newNameBuilder = new StringBuilder();
for (int i = 0; i < parts.length - 1; i++) {
if (i > 0) {
newNameBuilder.append("_");
}
newNameBuilder.append(parts[i]);
}
String extension = fileName.substring(fileName.lastIndexOf("."));
newNameBuilder.append("_").append(UUID.randomUUID()).append(extension);
newFileName = newNameBuilder.toString();
String prefix = parts[0] + "_" + parts[1];
newFileName = prefix + "_" + UUID.randomUUID() + "_"
+ EnvironmentEdgeManager.currentTimeMillis() + ".plog";
targetDirectory = file.getParent();
} else {
// File is not in in-progress directory, add UUID and move to IN_PROGRESS directory
// File is not in in-progress directory, add UUID + rename timestamp and move to
// IN_PROGRESS directory
String baseName = fileName.substring(0, fileName.lastIndexOf("."));
String extension = fileName.substring(fileName.lastIndexOf("."));
newFileName = baseName + "_" + UUID.randomUUID() + extension;
newFileName = baseName + "_" + UUID.randomUUID() + "_"
+ EnvironmentEdgeManager.currentTimeMillis() + ".plog";
targetDirectory = getInProgressDirPath();
}

Expand Down Expand Up @@ -427,42 +420,55 @@ public long getFileTimestamp(Path file) throws NumberFormatException {
}

/**
* Extracts the UUID from a log file name. Assumes UUID is the last part of the filename before
* the extension.
* Extracts the UUID from an in-progress file name. Format: <ts>_<server>_<UUID>_<renameTs>.plog →
* UUID is the third underscore-separated part.
* @param file - The file path to extract UUID from.
* @return Optional of UUID if file was in progress, else Optional.empty()
* @return Optional of UUID if file is in-progress format, else Optional.empty()
*/
protected Optional<String> getFileUUID(Path file) throws NumberFormatException {
String[] parts = file.getName().split("_");
if (parts.length < 3) {
if (parts.length < 4) {
return Optional.empty();
}
return Optional.of(parts[parts.length - 1].split("\\.")[0]);
return Optional.of(parts[2]);
}

/**
* Extracts everything except the UUID (last part) from a file path. For example, from
* "1704153600000_rs1_12345678-1234-1234-1234-123456789abc.plog" This method will return
* "1704153600000_rs1"
* @param file - The file path to extract prefix from.
* Extracts the rename timestamp from an in-progress file name. Format:
* <ts>_<server>_<UUID>_<renameTs>.plog → renameTs is the last underscore-separated part (before
* extension).
* @param file - The file path to extract rename timestamp from.
* @return Optional of rename timestamp, or empty if not present or invalid
*/
protected String getFilePrefix(Path file) {
String fileName = file.getName();
String[] parts = fileName.split("_");
if (parts.length < 3) {
return fileName.split("\\.")[0]; // Return full filename if no underscore found
public Optional<Long> getRenameTimestamp(Path file) {
String[] parts = file.getName().split("_");
if (parts.length < 4) {
return Optional.empty();
}

// Return everything except the last part (UUID)
StringBuilder prefix = new StringBuilder();
for (int i = 0; i < parts.length - 1; i++) {
if (i > 0) {
prefix.append("_");
}
prefix.append(parts[i]);
try {
String lastPart = parts[parts.length - 1];
String withoutExtension = lastPart.substring(0, lastPart.lastIndexOf("."));
return Optional.of(Long.parseLong(withoutExtension));
} catch (NumberFormatException e) {
LOG.warn("Failed to parse rename timestamp from file {}", file.getName());
return Optional.empty();
}
}

return prefix.toString();
/**
* Extracts the stable prefix from a file path. The prefix is the first two underscore-separated
* parts (<timestamp>_<servername>), which remain stable across renames. For example,
* "1704153600000_rs1_uuid_renameTs.plog" returns "1704153600000_rs1" and "1704153600000_rs1.plog"
* also returns "1704153600000_rs1".
* @param file - The file path to extract prefix from.
*/
protected String getFilePrefix(Path file) {
String[] parts = file.getName().split("_");
if (parts.length < 2) {
return file.getName().split("\\.")[0];
}
String secondPart = parts[1].contains(".") ? parts[1].split("\\.")[0] : parts[1];
return parts[0] + "_" + secondPart;
}

/**
Expand Down
Loading