Flink: Add RemoveDanglingDeleteFiles to maintenance API#16171
Flink: Add RemoveDanglingDeleteFiles to maintenance API#16171sachinnn99 wants to merge 1 commit intoapache:mainfrom
Conversation
Guosmilesmile
left a comment
There was a problem hiding this comment.
Thanks for the PR! It looks more like a standalone version that doesn’t need Flink . It can be done directly with the Java API. If the table is large, there could be a bottleneck.
9ad81d8 to
b1cc71a
Compare
sachinnn99
left a comment
There was a problem hiding this comment.
Thanks for the review! The single-operator approach follows the same pattern as ExpireSnapshotsProcessor. A distributed approach like DeleteOrphanFiles could be a follow-up for very large tables.
|
In the current implementation of this feature, there are places where DataFiles/DeleteFiles need to be read one by one. It also requires building a table-level Set and constructing partition-level minDataSeqByPartition. In large table scenarios, this will have performance issues and OOM. So I hope the Flink implementation can leverage distributed computing capabilities just like Spark does. |
8b5ec5c to
6c8f4c2
Compare
Closes apache#16138. Add RemoveDanglingDeleteFiles support to the Flink maintenance API across all three Flink versions (v1.20, v2.0, v2.1). The detection algorithm mirrors Spark's RemoveDanglingDeletesSparkAction with two independent checks: sequence-number-based dangling detection using an O(partitions) min-seq-per-partition map, and a separate targeted scan for deletion vectors referencing non-existent data files.
6c8f4c2 to
f988732
Compare
Summary
Fixes #16138.
The
RemoveDanglingDeleteFilesaction currently exists only for Spark. Flink users have no equivalent, causing equality delete files to accumulate indefinitely in V2 tables managed by the Flink maintenance pipeline.This PR adds
RemoveDanglingDeleteFilessupport to the Flink maintenance API across all three Flink versions (v1.20, v2.0, v2.1).Approach:
ExpireSnapshotspattern (singleProcessFunctionoperator)RemoveDanglingDeletesSparkAction's two-method structure:findDanglingDeletes— sequence-number-based detection using an O(partitions)minDataSeqByPartitionmap (position, equality, and DV deletes)findDanglingDvs— reference-based detection for deletion vectors via a targeted data manifest scan with O(DV candidates) memoryNew files (identical across all three Flink versions):
RemoveDanglingDeleteFiles.java— API builder extendingMaintenanceTaskBuilderRemoveDanglingDeleteFilesProcessor.java— Core operator with dangling detection logicTestRemoveDanglingDeleteFiles.java— Tests covering partitioned deletes, equality delete edge cases, unpartitioned tables, and no-op scenariosTest plan
TestRemoveDanglingDeleteFilespasses on Flink v1.20, v2.0, v2.1