Skip to content

Flink: Honor schema identifier fields in dynamic-sink record routing#16243

Open
jordepic wants to merge 1 commit intoapache:mainfrom
jordepic:main
Open

Flink: Honor schema identifier fields in dynamic-sink record routing#16243
jordepic wants to merge 1 commit intoapache:mainfrom
jordepic:main

Conversation

@jordepic
Copy link
Copy Markdown
Contributor

@jordepic jordepic commented May 7, 2026

DynamicSinkUtil.getEqualityFieldIds and DynamicWriter.getEqualityFields both fall back to the schema's identifierFieldIds when the user-supplied equality fields are empty, but two routing decisions in the dynamic sink ignored that fallback:

  1. HashKeyGenerator distributed identifier-only records round-robin, so two rows sharing an identifier-derived key could land on different writer subtasks while the writer still emitted equality deletes keyed by those identifier fields - breaking equality-delete correctness.
  2. DynamicRecordProcessor forwarded any record with a null distributionMode straight to the writer, even when the record resolved to a non-empty equality-field set. Forward-mode records sharing an equality key could likewise split across writers and leave duplicates behind.

Centralize the resolution in DynamicSinkUtil.resolveEqualityFieldNames and use it in both call sites so distribution and write-side equality-field inference stay aligned. Document the carve-out in flink-writes.md and add unit tests covering both paths across v2.1, v2.0 and v1.20.

DynamicSinkUtil.getEqualityFieldIds and DynamicWriter.getEqualityFields
both fall back to the schema's identifierFieldIds when the user-supplied
equality fields are empty, but two routing decisions in the dynamic sink
ignored that fallback:

1. HashKeyGenerator distributed identifier-only records round-robin, so
   two rows sharing an identifier-derived key could land on different
   writer subtasks while the writer still emitted equality deletes keyed
   by those identifier fields - breaking equality-delete correctness.
2. DynamicRecordProcessor forwarded any record with a null
   distributionMode straight to the writer, even when the record
   resolved to a non-empty equality-field set. Forward-mode records
   sharing an equality key could likewise split across writers and leave
   duplicates behind.

Centralize the resolution in DynamicSinkUtil.resolveEqualityFieldNames
and use it in both call sites so distribution and write-side
equality-field inference stay aligned. Document the carve-out in
flink-writes.md and add unit tests covering both paths across v2.1, v2.0
and v1.20.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant