Flink: Fix DynamicIcebergSink writing with stale field IDs after column delete and re-add#16249
Open
rayhondo wants to merge 2 commits intoapache:mainfrom
Open
Flink: Fix DynamicIcebergSink writing with stale field IDs after column delete and re-add#16249rayhondo wants to merge 2 commits intoapache:mainfrom
rayhondo wants to merge 2 commits intoapache:mainfrom
Conversation
bdf87c7 to
6641cf5
Compare
…rent schema only TableMetadataCache.schema() previously iterated all historical table schemas and returned the first SAME match from CompareSchemasVisitor. Because the visitor matches by name+type (not field IDs), an input schema that is structurally identical to an older schema (e.g. before a deleteColumn) deterministically matched the historical schema first and the sink wrote with stale field IDs. Reads against the current schema then returned null for the affected columns silently. Resolve only against the current schema. CacheItem now tracks currentSchemaId populated from table.schema().schemaId() in update().
6641cf5 to
1f1938e
Compare
The test wrote a row that drops `extra` followed by a row that re-adds it, but asserted the final table had only 2 columns. That assertion only held because of the TableMetadataCache historical-schema-shadow bug: the second row matched the pre-drop historical schema1 in table.schemas(), the cache reported "no update needed", and the re-add silently never happened. The test was encoding the bug. After resolving against current schema only, per-row reconciliation correctly re-adds `extra` with a new field id. Update the assertions to match: 3 columns, `extra` present, field id != 3.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
TableMetadataCache.schema()inDynamicIcebergSinkiterates all historical table schemas and returns on the firstSAMEmatch fromCompareSchemasVisitor. Since the visitor matches by name+type and not field IDs, an input schema that is structurally identical to an older table schema (e.g. before adeleteColumn) deterministically matches the historical schema first, and the sink writes with stale field IDs. The current schema then cannot resolve those columns on read and returns null silently — no exception is thrown.Reproduction: create a table with
[id, data, extra],updateSchema().deleteColumn("extra").commit(), thenaddColumn("extra", ...)to bring it back (extranow has a new field id). Send aDynamicRecordwith the original[id, data, extra]shape — the cache returnsSAMEagainst the historical schema and writes use its field id forextra. Reads via the current schema then returnnullforextra.This change resolves the input only against the current table schema.
CacheItemnow trackscurrentSchemaIdpopulated fromtable.schema().schemaId()inupdate(), and historical iteration is removed — selecting a historical schema as a write target was the bug.SCHEMA_UPDATE_NEEDEDcontinues to fall through to evolution as before.Applied identically to
flink/v1.20,flink/v2.0, andflink/v2.1. Tests added in each module:TestTableMetadataCache.testHistoricalSchemaDoesNotShadowCurrentAfterColumnDelete— unit test asserting the cache no longer returnsSAMEagainst a historical schema after a column delete.TestDynamicIcebergSink.testWriteAfterColumnDeleteAndReaddDoesNotUseHistoricalFieldIds— end-to-end regression that pre-creates a table, runsdeleteColumn+addColumnto give the column a new field id, writes a record with the pre-delete shape viaDynamicIcebergSink, and reads back. The re-added column's value must round-trip; pre-fix this assertion fails withexpected: "..." but was: null, exactly matching the silent-corruption signature.AI was used to draft the patch and tests; the author verified the root cause against the upstream source, confirmed the integration test reproduces the data-corruption symptom on pre-fix code, and ran the affected module tests +
spotlessChecklocally.