Flink: Honor schema identifier fields in dynamic-sink record routing#16243
Open
jordepic wants to merge 1 commit intoapache:mainfrom
Open
Flink: Honor schema identifier fields in dynamic-sink record routing#16243jordepic wants to merge 1 commit intoapache:mainfrom
jordepic wants to merge 1 commit intoapache:mainfrom
Conversation
DynamicSinkUtil.getEqualityFieldIds and DynamicWriter.getEqualityFields both fall back to the schema's identifierFieldIds when the user-supplied equality fields are empty, but two routing decisions in the dynamic sink ignored that fallback: 1. HashKeyGenerator distributed identifier-only records round-robin, so two rows sharing an identifier-derived key could land on different writer subtasks while the writer still emitted equality deletes keyed by those identifier fields - breaking equality-delete correctness. 2. DynamicRecordProcessor forwarded any record with a null distributionMode straight to the writer, even when the record resolved to a non-empty equality-field set. Forward-mode records sharing an equality key could likewise split across writers and leave duplicates behind. Centralize the resolution in DynamicSinkUtil.resolveEqualityFieldNames and use it in both call sites so distribution and write-side equality-field inference stay aligned. Document the carve-out in flink-writes.md and add unit tests covering both paths across v2.1, v2.0 and v1.20.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
DynamicSinkUtil.getEqualityFieldIds and DynamicWriter.getEqualityFields both fall back to the schema's identifierFieldIds when the user-supplied equality fields are empty, but two routing decisions in the dynamic sink ignored that fallback:
Centralize the resolution in DynamicSinkUtil.resolveEqualityFieldNames and use it in both call sites so distribution and write-side equality-field inference stay aligned. Document the carve-out in flink-writes.md and add unit tests covering both paths across v2.1, v2.0 and v1.20.