Skip to content

Flink: Fix DynamicIcebergSink writing with stale field IDs after column delete and re-add#16249

Open
rayhondo wants to merge 2 commits intoapache:mainfrom
rayhondo:flink-tablemetadatacache-current-schema-only
Open

Flink: Fix DynamicIcebergSink writing with stale field IDs after column delete and re-add#16249
rayhondo wants to merge 2 commits intoapache:mainfrom
rayhondo:flink-tablemetadatacache-current-schema-only

Conversation

@rayhondo
Copy link
Copy Markdown

@rayhondo rayhondo commented May 8, 2026

TableMetadataCache.schema() in DynamicIcebergSink iterates all historical table schemas and returns on the first SAME match from CompareSchemasVisitor. Since the visitor matches by name+type and not field IDs, an input schema that is structurally identical to an older table schema (e.g. before a deleteColumn) deterministically matches the historical schema first, and the sink writes with stale field IDs. The current schema then cannot resolve those columns on read and returns null silently — no exception is thrown.

Reproduction: create a table with [id, data, extra], updateSchema().deleteColumn("extra").commit(), then addColumn("extra", ...) to bring it back (extra now has a new field id). Send a DynamicRecord with the original [id, data, extra] shape — the cache returns SAME against the historical schema and writes use its field id for extra. Reads via the current schema then return null for extra.

This change resolves the input only against the current table schema. CacheItem now tracks currentSchemaId populated from table.schema().schemaId() in update(), and historical iteration is removed — selecting a historical schema as a write target was the bug. SCHEMA_UPDATE_NEEDED continues to fall through to evolution as before.

Applied identically to flink/v1.20, flink/v2.0, and flink/v2.1. Tests added in each module:

  • TestTableMetadataCache.testHistoricalSchemaDoesNotShadowCurrentAfterColumnDelete — unit test asserting the cache no longer returns SAME against a historical schema after a column delete.
  • TestDynamicIcebergSink.testWriteAfterColumnDeleteAndReaddDoesNotUseHistoricalFieldIds — end-to-end regression that pre-creates a table, runs deleteColumn + addColumn to give the column a new field id, writes a record with the pre-delete shape via DynamicIcebergSink, and reads back. The re-added column's value must round-trip; pre-fix this assertion fails with expected: "..." but was: null, exactly matching the silent-corruption signature.

AI was used to draft the patch and tests; the author verified the root cause against the upstream source, confirmed the integration test reproduces the data-corruption symptom on pre-fix code, and ran the affected module tests + spotlessCheck locally.

@github-actions github-actions Bot added the flink label May 8, 2026
@rayhondo rayhondo force-pushed the flink-tablemetadatacache-current-schema-only branch 2 times, most recently from bdf87c7 to 6641cf5 Compare May 8, 2026 01:52
…rent schema only

TableMetadataCache.schema() previously iterated all historical table schemas and
returned the first SAME match from CompareSchemasVisitor. Because the visitor
matches by name+type (not field IDs), an input schema that is structurally
identical to an older schema (e.g. before a deleteColumn) deterministically
matched the historical schema first and the sink wrote with stale field IDs.
Reads against the current schema then returned null for the affected columns
silently.

Resolve only against the current schema. CacheItem now tracks currentSchemaId
populated from table.schema().schemaId() in update().
@rayhondo rayhondo force-pushed the flink-tablemetadatacache-current-schema-only branch from 6641cf5 to 1f1938e Compare May 8, 2026 02:14
@rayhondo rayhondo changed the title Flink: DynamicIcebergSink TableMetadataCache must resolve against current schema only Flink: Fix DynamicIcebergSink writing with stale field IDs after column delete and re-add May 8, 2026
The test wrote a row that drops `extra` followed by a row that re-adds
it, but asserted the final table had only 2 columns. That assertion only
held because of the TableMetadataCache historical-schema-shadow bug: the
second row matched the pre-drop historical schema1 in table.schemas(),
the cache reported "no update needed", and the re-add silently never
happened. The test was encoding the bug.

After resolving against current schema only, per-row reconciliation
correctly re-adds `extra` with a new field id. Update the assertions to
match: 3 columns, `extra` present, field id != 3.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant