Skip to content

PHOENIX-7751 : [SyncTable Tool] Feature to validate table data using PhoenixSyncTable tool b/w source and target cluster#2379

Open
rahulLiving wants to merge 27 commits intoapache:masterfrom
rahulLiving:PHOENIX-7751
Open

PHOENIX-7751 : [SyncTable Tool] Feature to validate table data using PhoenixSyncTable tool b/w source and target cluster#2379
rahulLiving wants to merge 27 commits intoapache:masterfrom
rahulLiving:PHOENIX-7751

Conversation

@rahulLiving
Copy link
Copy Markdown
Contributor

No description provided.

@rahulLiving rahulLiving marked this pull request as ready for review March 12, 2026 12:36

/**
* PhoenixSyncTableTool chunk metadata cell qualifiers. These define the wire protocol between
* hoenixSyncTableRegionScanner (server-side coprocessor) and PhoenixSyncTableMapper (client-side
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo missing 'P'


public static Long getPhoenixSyncTableFromTime(Configuration conf) {
Preconditions.checkNotNull(conf);
String value = conf.get(PHOENIX_SYNC_TABLE_FROM_TIME);
Copy link
Copy Markdown
Contributor

@tkhurana tkhurana Mar 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why didn't you use conf.getLong() ?

conf.setLong(PHOENIX_SYNC_TABLE_TO_TIME, toTime);
}

public static Long getPhoenixSyncTableToTime(Configuration conf) {
Copy link
Copy Markdown
Contributor

@tkhurana tkhurana Mar 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here also why didn't you use conf.getLong ?

return configuration.getBoolean(MAPREDUCE_RANDOMIZE_MAPPER_EXECUTION_ORDER,
DEFAULT_MAPREDUCE_RANDOMIZE_MAPPER_EXECUTION_ORDER);
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO these APIs can remain in PhoenixSyncTableTool class only. They are specific to Sync tool

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I saw other Tool also has its setter/getter in PhoenixConfigurationUtil.java, so followed same pattern. I am okay to move

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By definition util is something which is useful in multiple contexts. I don't think we should follow the same pattern.

return false;
}

buildChunkMetadataResult(results, isTargetScan);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we break out early due to page timeout won't we have a partial chunk ?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that isTargetScan is for different purpose or at-least the naming can be improved.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we break out early due to page timeout won't we have a partial chunk ?

I have kept source not to have partial chunk, whatever can be processed with page timeout will be considered as source chunk and target will scan with that source chunk size.
Though we can have partial chunk for source, but I was thinking if chunking is taking ~5-10 mins, its better not to hit the same server immediately to let server cool off ?

For target chunk, we always assume target as partial chunk. and caulculates final checksum in Mapper itslef when all rows boundary is read.
That is why isTargetScan is synonymous to partialChunk.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure what you mean by "not to hit the same server immediately to let server cool off". I would suggest refactoring the code a little bit . isTargetScan is already a field of this class. No need to pass it as a parameter to the buildChunkMetadataResult function. Rather, just directly reference this field inside the function. IMO, the current naming is a little confusing. Also, please add comments on why the target chunk is always assumed as a partial chunk.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure what you mean by "not to hit the same server immediately to let server cool off"

I meant, If we see page timeout(15 mins) for source cluster, we return with whatever has been collected be it 1 row or 1GB of row. And then look for same rows in target region boundary.
We have an option to continue making source chunk if we see page timeout, until we get 1GB of data or end of region. But I avoided that. Reason being, if we are seeing page timeout, it indicates server is not able to keep up with the request. So instead we go to target cluster to get chunk and validate checksum.
Lets say it took a min to calculate checksum from target cluster, we delayed hitting source RS again by 1 min.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets say it took a min to calculate checksum from target cluster, we delayed hitting source RS again by 1 min.

I realized, shall we add an explicit delay to throttle as well at Mapper side if source chunk times out before processing 1GB chunk ?

Comment on lines +81 to +84
private byte[] chunkStartKey = null;
private byte[] chunkEndKey = null;
private long currentChunkSize = 0L;
private long currentChunkRowCount = 0L;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Improvement can be made here to introduce the notion of a chunk object

byte[] rowKey = CellUtil.cloneRow(rowCells.get(0));
long rowSize = calculateRowSize(rowCells);
addRowToChunk(rowKey, rowCells, rowSize);
if (!isTargetScan && willExceedChunkLimits(rowSize)) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So addRowToChunk is already adding the rowSize to chunkSize and then willExceedChunkLimits is again adding rowSize to chunkSize

public boolean next(List<Cell> results, ScannerContext scannerContext) throws IOException {
region.startRegionOperation();
try {
resetChunkState();
Copy link
Copy Markdown
Contributor

@tkhurana tkhurana Mar 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you have a notion of a chunk object then you don't need reset you can simply create a new chunk

/**
* PhoenixSyncTableTool scan attributes for server-side chunk formation and checksum
*/
public static final String SYNC_TABLE_CHUNK_FORMATION = "_SyncTableChunkFormation";
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should all of these instead be named SYNC_TOOL ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have used SyncTableTool for user facing class/config. For others, I have used SyncTable, are you recommending to move all Classes and config to SyncTool instead of SyncTable i.e PhoenixSyncTableRegionScanner -> PhoenixSyncToolRegionScanner ?
I felt SyncTable is more self explainable compared to SyncTool, we can also change it to SyncTableTool at all places ?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Its okay. Not a big deal. We can stick with the same naming convention.

Comment on lines +168 to +172
if (chunkStartKey == null) {
LOGGER.warn("Paging timed out while fetching first row of chunk, initStartRowKey: {}",
Bytes.toStringBinary(initStartRowKey));
updateDummyWithPrevRowKey(results, initStartRowKey, includeInitStartRowKey, scan);
return true;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this ever hit ? Even with 0 page timeout we get at least one row

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I was not able repro it in my Integration test. Kept it as defensive check.

Even with 0 page timeout we get at least one row

what would this row contain, if we couldn't get any row from table ?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Either we will get some exception or we will get a row. You can simplify your code by not handling this case. I think this will also get rid of updateDummyWithPrevRowKey function as it seems this is the only place which is calling this function.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, we can remove handling of dummy rows in SyncTableMapper as well then.

@Override
protected void map(NullWritable key, DBInputFormat.NullDBWritable value, Context context)
throws IOException, InterruptedException {
context.getCounter(PhoenixJobCounters.INPUT_RECORDS).increment(1);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the meaning of INPUT_RECORDS in the context of sync tool ?

Copy link
Copy Markdown
Contributor Author

@rahulLiving rahulLiving Mar 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It indicates number of mappers created

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need a Phoenix specific counter for it. The Map reduce framework already tells us the number of mappers.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Via this we are also maintaining counters for mappers created, mappers with no failed chunk and mapper with failed chunk.
Based on your suggestion on other comment., we can rename it to VERIFIED_MAPPER, FAILED_MAPPER.


if (sourceRowsProcessed > 0) {
if (mismatchedChunk == 0) {
context.getCounter(PhoenixJobCounters.OUTPUT_RECORDS).increment(1);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does the OUTPUT_RECORDS mean in the context of Sync tool ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Number of mapper sucessfully processed. We also have FAILED_RECORD for failed mappers.

Copy link
Copy Markdown
Contributor

@tkhurana tkhurana Mar 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RECORDS generally implies rows. One suggestion could be to use VERIFIED_CHUNKS, FAILED_CHUNKS

+ " TO_TIME BIGINT NOT NULL,\n" + " START_ROW_KEY VARBINARY_ENCODED,\n"
+ " END_ROW_KEY VARBINARY_ENCODED,\n" + " IS_DRY_RUN BOOLEAN, \n"
+ " EXECUTION_START_TIME TIMESTAMP,\n" + " EXECUTION_END_TIME TIMESTAMP,\n"
+ " STATUS VARCHAR(20),\n" + " COUNTERS VARCHAR(255), \n"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think Counters should have a fixed limit. Just make them VARCHAR so that we can add more counters in the future.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should also add tenantId as one of PK column.


public enum Type {
CHUNK,
MAPPER_REGION
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe just REGION


String query = "SELECT START_ROW_KEY, END_ROW_KEY FROM " + SYNC_TABLE_CHECKPOINT_TABLE_NAME
+ " WHERE TABLE_NAME = ? AND TARGET_CLUSTER = ?"
+ " AND TYPE = ? AND FROM_TIME = ? AND TO_TIME = ? AND STATUS IN ( ?, ?)";
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are only 2 possible status so does it make sense to set them in the query ? If you don't then you are only querying pk columns without any filter.

qSchemaName = SchemaUtil.normalizeIdentifier(schemaName);
PhoenixMapReduceUtil.validateTimeRange(startTime, endTime, qTable);
PhoenixMapReduceUtil.validateMaxLookbackAge(configuration, endTime, qTable);
if (LOGGER.isDebugEnabled()) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's move this log to INFO level. It will be useful.

formatter.printHelp("hadoop jar phoenix-server.jar " + PhoenixSyncTableTool.class.getName(),
"Synchronize a Phoenix table between source and target clusters", options,
"\nExample usage:\n"
+ "hadoop jar phoenix-server.jar org.apache.phoenix.mapreduce.PhoenixSyncTableTool \\\n"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally we run IndexTool via /hbase/bin/hbase IndexTool.

qTable = SchemaUtil.getQualifiedTableName(schemaName, tableName);
qSchemaName = SchemaUtil.normalizeIdentifier(schemaName);
PhoenixMapReduceUtil.validateTimeRange(startTime, endTime, qTable);
PhoenixMapReduceUtil.validateMaxLookbackAge(configuration, endTime, qTable);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need the end time to be within the max lookback window ? How will the sync tool break if the end time is outside of the max lookback window ?

Copy link
Copy Markdown
Contributor Author

@rahulLiving rahulLiving Mar 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, this check is not useful.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On the other hand, we should not only enforce that it is not outside the window, we should also enforce a "safety buffer" to accommodate the data in flight. Even when the endTime is with in the window, if it is too close to the current time, it may miss the data that is still in flight and may cause false positives. In practice, this may not matter as the time it takes to setup and run could be in the order of several minutes and so enough for the catch up to complete, but I think it is better to make it explicit by enforcing a safety buffer and make this more deterministic.

If we remove this check and allow the endTime to be in the future, the possibility of having false positives due to the data in flight becomes a lot more pronounced. By enforcing both startTime and endTIme, we can ensure a "consistent window" where data is guaranteed to be fully replicated and 'quiesced' on both sides. WDYT?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking more about the "consistent window" or "quiesced window" approach that I suggested above and realized this is actually a race against sliding window during long-running jobs.

If a sync job takes several hours to complete, a startTime that was valid at the beginning of the job might actually 'slide' out of the lookback window by the time the final Mappers execute. Since HBase compactions on the Source and Target clusters aren't synchronized, couldn't this lead to false-positive mismatches if one cluster purges historical data mid-run while the other hasn't yet?

It may not always be possible to make the "Safety Buffer" on the startTime large enough to account for the job execution time, what if the max lookback window is only a few hours and the job itself takes hours? Does this require utilizing HBase Snapshots to 'freeze' the data state for the duration of the sync? Are there existing pattern that other systems might have employed to handle this issue?

@kadirozde @tkhurana

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to think about this from two perspective, where we run the sync job regularly as cron, secondly if we use this for migration validation.
For migration validation, start time would definitely before maxLookbackAge. It is upto the owner if they want to validate all version and delete markers or just latest version.
For regular cron job to be used in PhoenixHA, we can configure the start/end time to be within maxLookBackAge.
Tanuj suggestion of giving user flexibility to choose rawScan & allVersion option would be helpful. And since we plan to fix the mismatched rows as well, we can consider source as SOT and fix accordingly.
Though, there can be instance where it can't be fixed like source have removed delete marker via compaction but target still has delete marker. Such rows can be flagged as not fixable as per design.

Btw, default endTime is (currentTime - 1 hour), to ensure target has the desired data.

PhoenixConfigurationUtil.setPhoenixSyncTableChunkSizeBytes(configuration, chunkSizeBytes);
}
if (tenantId != null) {
PhoenixConfigurationUtil.setTenantId(configuration, tenantId);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you verify if the tenantid is being correctly set as a key prefix on the scan ?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you have a table region with multiple tenants and we pass a tenant id then our scan range should start with the tenantid prefix.

Copy link
Copy Markdown
Contributor Author

@rahulLiving rahulLiving Mar 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it only create input ranges and scan for tenant specific rows. We have an IT for same

* Configures a Configuration object with ZooKeeper settings from a ZK quorum string.
* @param baseConf Base configuration to create from (typically job configuration)
* @param zkQuorum ZooKeeper quorum string in format: "zk_quorum:port:znode" Example:
* "zk1,zk2,zk3:2181:/hbase"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is actually not the only format for zk quorum. There are other valid formats also where the port number is specified separately for each server. There is actually a very useful API in Hbase called HBaseConfiguration.createClusterConf(job.getConfiguration(), targetZkQuorum) We should use that as that also works for zk registry.


String query = "SELECT START_ROW_KEY, END_ROW_KEY FROM " + SYNC_TABLE_CHECKPOINT_TABLE_NAME
+ " WHERE TABLE_NAME = ? AND TARGET_CLUSTER = ?"
+ " AND TYPE = ? AND FROM_TIME = ? AND TO_TIME = ? AND STATUS IN ( ?, ?)";
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not 100% positive that you can assume that the output of this query is always sorted by row key. You might have to add an ORDER BY clause here. If you are adding an ORDER BY clause it will be better to add all the PK columns to make the sorting more efficient.

int completedIdx = 0;

// Two pointer comparison across splitRange and completedRange
while (splitIdx < allSplits.size() && completedIdx < completedRegions.size()) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you are assuming here that completedRegions is already sorted. Please see my comment on the getProcessedMapperRegions function.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't the results be sorted in the PK order already? I see that the new commit adds ORDER BY, but not sure why that is required.

PhoenixInputSplit split = (PhoenixInputSplit) allSplits.get(splitIdx);
KeyRange splitRange = split.getKeyRange();
KeyRange completedRange = completedRegions.get(completedIdx);
byte[] splitStart = splitRange.getLowerRange();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will the end key of the split range will always be exclusive ? If yes, can you please add a comment

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, both splitRange and completedRange, start key would be inclusive and endKey exclusive always. Will add a comment.

* @return List of (startKey, endKey) pairs representing unprocessed ranges
*/
@VisibleForTesting
public List<Pair<byte[], byte[]>> calculateUnprocessedRanges(byte[] mapperRegionStart,
Copy link
Copy Markdown
Contributor

@tkhurana tkhurana Mar 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we could return a List<KeyRange>

if (hasStartBoundary) {
queryBuilder.append(" AND END_ROW_KEY >= ?");
}
queryBuilder.append(" AND STATUS IN (?, ?)");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above we don't need to pass status

scan.setCacheBlocks(false);
scan.setTimeRange(fromTime, toTime);
if (isTargetScan) {
scan.setLimit(1);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a comment here why we are setting limit to 1 and caching to 1

Scan scan = new Scan();
scan.withStartRow(startKey, isStartKeyInclusive);
scan.withStopRow(endKey, isEndKeyInclusive);
scan.setRaw(true);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we sure we have to do raw scan ?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, can we make this configurable via the SyncTool commandl ine

scan.withStartRow(startKey, isStartKeyInclusive);
scan.withStopRow(endKey, isEndKeyInclusive);
scan.setRaw(true);
scan.readAllVersions();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same can we make the behavior of reading all versions configurable.

@@ -0,0 +1,2267 @@
/*
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a test where rows are deleted on both the source and target tables but you have run compaction on only one. We can have actually 2 cases where compaction is run on the source but not on target and vice versa. I saw that you are doing raw scan. Maxlookback settings will also impact this.

Copy link
Copy Markdown
Contributor

@haridsv haridsv left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just skimmed through and left some comments at the surface level.

Comment on lines +46 to +52
public static byte[] encodeDigestState(SHA256Digest digest) {
byte[] encoded = digest.getEncodedState();
ByteBuffer buffer = ByteBuffer.allocate(Bytes.SIZEOF_INT + encoded.length);
buffer.putInt(encoded.length);
buffer.put(encoded);
return buffer.array();
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since MAX_SHA256_DIGEST_STATE_SIZE is capped at 128 bytes , using a 4-byte integer and ByteBuffer for the length prefix is slightly over-engineered. We can optimize this by using a single byte for the length and Bytes.add() for concatenation. This would allow us to remove the ByteBuffer, ByteArrayInputStream, and DataInputStream dependencies in these utility methods.

Suggested change
public static byte[] encodeDigestState(SHA256Digest digest) {
byte[] encoded = digest.getEncodedState();
ByteBuffer buffer = ByteBuffer.allocate(Bytes.SIZEOF_INT + encoded.length);
buffer.putInt(encoded.length);
buffer.put(encoded);
return buffer.array();
}
public static byte[] encodeDigestState(SHA256Digest digest) {
byte[] encoded = digest.getEncodedState();
// Use an unsigned byte as 128 > Byte.MAX_VALUe
return Bytes.add(new byte[]{(byte) (encoded.length & 0xff)}, encoded);
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, can you tell me why we need to encode the length into it? You are using PhoenixKeyValueUtil.newKeyValue which is already encoding the length of the byte[] anyway.

public class SHA256DigestUtil {

/**
* Maximum allowed size for encoded SHA-256 digest state. SHA-256 state is ~96 bytes, we allow up
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you point me to the documentation on the size being ~96 bytes?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment on lines +65 to +75
DataInputStream dis = new DataInputStream(new ByteArrayInputStream(encodedState));
int stateLength = dis.readInt();
// Prevent malicious large allocations
if (stateLength > MAX_SHA256_DIGEST_STATE_SIZE) {
throw new IllegalArgumentException(
String.format("Invalid SHA256 state length: %d, expected <= %d", stateLength,
MAX_SHA256_DIGEST_STATE_SIZE));
}

byte[] state = new byte[stateLength];
dis.readFully(state);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Following my suggestion in encode, this will simply become:

Suggested change
DataInputStream dis = new DataInputStream(new ByteArrayInputStream(encodedState));
int stateLength = dis.readInt();
// Prevent malicious large allocations
if (stateLength > MAX_SHA256_DIGEST_STATE_SIZE) {
throw new IllegalArgumentException(
String.format("Invalid SHA256 state length: %d, expected <= %d", stateLength,
MAX_SHA256_DIGEST_STATE_SIZE));
}
byte[] state = new byte[stateLength];
dis.readFully(state);
int stateLength = encodedState[0] & 0xff;
// Prevent malicious large allocations
if (stateLength > MAX_SHA256_DIGEST_STATE_SIZE) {
throw new IllegalArgumentException(
String.format("Invalid SHA256 state length: %d, expected <= %d", stateLength,
MAX_SHA256_DIGEST_STATE_SIZE));
}
byte[] state = new byte[stateLength];
System.arraycopy(encodedState, 1, state, 0, stateLength);

*/
public static final String SYNC_TABLE_CHUNK_FORMATION = "_SyncTableChunkFormation";
public static final String SYNC_TABLE_CHUNK_SIZE_BYTES = "_SyncTableChunkSizeBytes";
public static final String SYNC_TABLE_CONTINUED_DIGEST_STATE = "_SyncTableContinuedDigestState";
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add JavaDoc on all 3 constants individually with a description of what they the attribute is and what type of value it would contain?

return scan.getAttribute((BaseScannerRegionObserverConstants.REBUILD_INDEXES)) != null;
}

public static boolean isSyncTableChunkFormation(Scan scan) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean isSyncTableChunkFormationEnabled?

connectToTargetCluster();
globalConnection = createGlobalConnection(conf);
syncTableOutputRepository = new PhoenixSyncTableOutputRepository(globalConnection);
} catch (SQLException | IOException e) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would include RuntimeException too, to be more aggressive in avoiding a resource leak.

Bytes.toBytes(chunkSizeBytes));
}
long syncTablePageTimeoutMs = (long) (conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY,
QueryServicesOptions.DEFAULT_SYNC_TABLE_RPC_TIMEOUT) * 0.5);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the basis for this multiplier? Should it be a configurable value?

queryBuilder.append(" AND START_ROW_KEY <= ?");
}
if (hasStartBoundary) {
queryBuilder.append(" AND END_ROW_KEY >= ?");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For last region, there will be no constraint on START_ROW_KEY and END_ROW_KEY is not part of the PK, so this can perform poorly, I think.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we should add " AND START_ROW_KEY > <0x00>"? You may want to check the query plan for with and without this constraint to see if it is helping.

private Boolean isDryRun;
private byte[] startRowKey;
private byte[] endRowKey;
private Boolean isFirstRegion;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this being used?

return parseCounterValue(PhoenixSyncTableMapper.SyncCounters.TARGET_ROWS_PROCESSED.name());
}

@VisibleForTesting
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On a private function?

qTable = SchemaUtil.getQualifiedTableName(schemaName, tableName);
qSchemaName = SchemaUtil.normalizeIdentifier(schemaName);
PhoenixMapReduceUtil.validateTimeRange(startTime, endTime, qTable);
PhoenixMapReduceUtil.validateMaxLookbackAge(configuration, endTime, qTable);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we need to validate that the startTime is within the max lookback window? If startTime is beyond the window, background compactions on the source and target clusters may have purged different sets of historical versions or delete markers. Since these compactions don't run in sync, the tool will see different data states on each cluster, leading to false-positive mismatches for data that is actually consistent but has simply been cleaned up at different times.

Copy link
Copy Markdown
Contributor

@tkhurana tkhurana Mar 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The question is do we want to verify the full history or just the latest version ? If we don't do raw scans and don't fetch all the versions then we don't have to deal with all the complexity around max lookback window and compactions. It will also make the tool run faster. That is why I suggested making those configurable. I feel we don't really need to look at the entire history. As long as the end time is (current time - some configurable lag) we can get a consistent snapshot of the source and target.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just saw your other comments on setRaw(). I agree, if we don't do raw scan, then we don't have to worry about these aspects, but if the HA guarantee includes the history (e.g., CDC), then we do we have a choice?

qTable = SchemaUtil.getQualifiedTableName(schemaName, tableName);
qSchemaName = SchemaUtil.normalizeIdentifier(schemaName);
PhoenixMapReduceUtil.validateTimeRange(startTime, endTime, qTable);
PhoenixMapReduceUtil.validateMaxLookbackAge(configuration, endTime, qTable);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On the other hand, we should not only enforce that it is not outside the window, we should also enforce a "safety buffer" to accommodate the data in flight. Even when the endTime is with in the window, if it is too close to the current time, it may miss the data that is still in flight and may cause false positives. In practice, this may not matter as the time it takes to setup and run could be in the order of several minutes and so enough for the catch up to complete, but I think it is better to make it explicit by enforcing a safety buffer and make this more deterministic.

If we remove this check and allow the endTime to be in the future, the possibility of having false positives due to the data in flight becomes a lot more pronounced. By enforcing both startTime and endTIme, we can ensure a "consistent window" where data is guaranteed to be fully replicated and 'quiesced' on both sides. WDYT?

* @throws SQLException if connection fails
* @throws IllegalArgumentException if validation fails
*/
public static PTable validateTableForMRJob(Connection connection, String qualifiedTableName,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about?

Suggested change
public static PTable validateTableForMRJob(Connection connection, String qualifiedTableName,
public static PTable getPTableWithValidation(Connection connection, String qualifiedTableName,

try (PreparedStatement ps = connection.prepareStatement(UPSERT_CHECKPOINT_SQL)) {
ps.setString(1, row.getTableName());
ps.setString(2, row.getTargetCluster());
ps.setString(3, row.getType().name());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would recommend storing a byte code rather a long string to reduce the size of the row key.

ps.setBoolean(9, row.getIsDryRun());
ps.setTimestamp(10, row.getExecutionStartTime());
ps.setTimestamp(11, row.getExecutionEndTime());
ps.setString(12, row.getStatus() != null ? row.getStatus().name() : null);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to type, though not as important, it would be better to store a code rather than the name.

}
if (row.getFromTime() == null || row.getToTime() == null) {
throw new IllegalArgumentException("FromTime and ToTime cannot be null for checkpoint");
}
Copy link
Copy Markdown
Contributor

@haridsv haridsv Mar 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see you are already using Preconditions in a few other places, why not use here and a few other places and make it consistent and less verbose?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Edited the comment to add the missing Preconditions.

sourceTable = PhoenixConfigurationUtil.getIndexToolSourceTable(conf);
Assert.assertEquals(sourceTable, SourceTable.DATA_TABLE_SOURCE);
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inadvertent change?

+ " COUNTERS VARCHAR, \n" + " CONSTRAINT PK PRIMARY KEY (\n" + " TABLE_NAME,\n"
+ " TARGET_CLUSTER,\n" + " TYPE ,\n" + " FROM_TIME,\n"
+ " TO_TIME,\n" + " TENANT_ID,\n" + " START_ROW_KEY )" + ") TTL="
+ OUTPUT_TABLE_TTL_SECONDS;
Copy link
Copy Markdown
Contributor

@haridsv haridsv Mar 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is UPSERT_CHECKPOINT_SQL a static field but this one is not? It seems inconsistent.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might update schema with ALTER statement in future as per requirement. So, I thought of keeping all at one place.

try (Statement stmt = connection.createStatement()) {
stmt.execute(ddl);
connection.commit();
LOGGER.info("Successfully created or verified existence of {} table",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No verification is being done, perhaps you want to say something like this?

Suggested change
LOGGER.info("Successfully created or verified existence of {} table",
LOGGER.info("Initialization of checkpoint table {} complete",

}
}
return 0;
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The counters string is generated outside the class, while parsing is done inside, this mismatch in abstraction is not good. I suggest you either encapsulate both in this class or move both into one util class.

qTable = SchemaUtil.getQualifiedTableName(schemaName, tableName);
qSchemaName = SchemaUtil.normalizeIdentifier(schemaName);
PhoenixMapReduceUtil.validateTimeRange(startTime, endTime, qTable);
PhoenixMapReduceUtil.validateMaxLookbackAge(configuration, endTime, qTable);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just saw your other comments on setRaw(). I agree, if we don't do raw scan, then we don't have to worry about these aspects, but if the HA guarantee includes the history (e.g., CDC), then we do we have a choice?

queryBuilder.append(" AND START_ROW_KEY <= ?");
}
if (hasStartBoundary) {
queryBuilder.append(" AND END_ROW_KEY >= ?");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we should add " AND START_ROW_KEY > <0x00>"? You may want to check the query plan for with and without this constraint to see if it is helping.

}

queryBuilder.append(
" ORDER BY TABLE_NAME, TARGET_CLUSTER, TYPE, FROM_TIME, TO_TIME, TENANT_ID, START_ROW_KEY");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is already the PK order, correct? Why do we need an explicit ORDER BY?

}

queryBuilder.append(
" ORDER BY TABLE_NAME, TARGET_CLUSTER, TYPE, FROM_TIME, TO_TIME, TENANT_ID, START_ROW_KEY");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question as above on the need for ORDER BY.

}
if (row.getFromTime() == null || row.getToTime() == null) {
throw new IllegalArgumentException("FromTime and ToTime cannot be null for checkpoint");
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Edited the comment to add the missing Preconditions.

int completedIdx = 0;

// Two pointer comparison across splitRange and completedRange
while (splitIdx < allSplits.size() && completedIdx < completedRegions.size()) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't the results be sorted in the PK order already? I see that the new commit adds ORDER BY, but not sure why that is required.

String targetZkQuorum = PhoenixSyncTableTool.getPhoenixSyncTableTargetZkQuorum(conf);
Long fromTime = PhoenixSyncTableTool.getPhoenixSyncTableFromTime(conf);
Long toTime = PhoenixSyncTableTool.getPhoenixSyncTableToTime(conf);
List<InputSplit> allSplits = super.getSplits(context);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not cast once here and avoid casting at multiple places later?

*/
private Connection createGlobalConnection(Configuration conf) throws SQLException {
Configuration globalConf = new Configuration(conf);
globalConf.unset(PhoenixConfigurationUtil.MAPREDUCE_TENANT_ID);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't tenant ID optional? Perhaps you can use the same connection for both if it is not present?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants