storage: Add Debezium JSON envelope support for Kafka sources#35342
storage: Add Debezium JSON envelope support for Kafka sources#35342
Conversation
Previously, the Debezium envelope only supported Avro encoding. This adds support for JSON-encoded Debezium messages, enabling ingestion from systems like TiCDC that emit Debezium-format JSON. The new `UpsertStyle::DebeziumJson` variant handles envelope extraction at render time by parsing the Jsonb datum to extract `op` and `after` fields. The output schema is `[key: jsonb, data: jsonb, ...metadata]`, where `key` is the decoded Kafka message key and `data` is the extracted `after` payload. Syntax: KEY FORMAT JSON VALUE FORMAT JSON ENVELOPE DEBEZIUM KEY FORMAT JSON VALUE FORMAT JSON ENVELOPE DEBEZIUM (MODE = 'TICDC') The optional MODE parameter selects dialect-specific handling (currently TiCDC vs generic). Both modes support the standard Debezium operations: create (c), update (u), read/snapshot (r), delete (d), and truncate (t). Payload-wrapped format (schema+payload) is also handled transparently. Tests added: - test/testdrive/kafka-debezium-json-sources.td (generic Debezium JSON) - test/testdrive/kafka-debezium-json-ticdc-sources.td (TiCDC mode) - Updated kafka-upsert-debezium-sources.td error message test Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
|
Thanks for opening this PR! Here are a few tips to help make the review process smooth for everyone. PR title guidelines
Pre-merge checklist
|
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The `envelope` jsonb column contains the full Debezium envelope, making CDC metadata like `commit_ts`, `ts_ms`, and `source` fields accessible for user queries. This is critical for TiCDC users who need `commit_ts` to correlate CDC messages with Dumpling snapshots. Output schema is now [key: jsonb, data: jsonb, envelope: jsonb, ...metadata]. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Replace the always-on `envelope` column with an opt-in mechanism
using `INCLUDE DEBEZIUM METADATA [AS alias]`, consistent with
existing INCLUDE OFFSET/PARTITION patterns.
Users who need CDC metadata (e.g., TiCDC commit_ts for Dumpling
snapshot correlation) opt in explicitly:
CREATE TABLE ... FROM SOURCE ...
KEY FORMAT JSON VALUE FORMAT JSON
INCLUDE DEBEZIUM METADATA AS dbz_meta
ENVELOPE DEBEZIUM (MODE = 'TICDC')
SELECT dbz_meta->'source'->>'commit_ts' FROM ...
Default column name is `debezium_metadata` when no alias is given.
Validation ensures INCLUDE DEBEZIUM METADATA is only used with
ENVELOPE DEBEZIUM + VALUE FORMAT JSON.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add DD-18 documenting the opt-in envelope metadata approach via INCLUDE DEBEZIUM METADATA [AS alias]. Update Task 2b (parser), Task 3 (UpsertStyle variant), Task 4 (descriptor), Task 5 (planner), Task 6 (render), and acceptance criteria (AC-7, AC-18 through AC-20). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
|
@claude what sayest thou? |
|
Claude finished @bosconi's task in 4m 33s —— View job Code Review
Overall this is a solid, well-structured PR. The layered approach (parser → planner → storage-types → render) is clean, and the test coverage is good. A few issues worth addressing before merge: Issues1.
|
|
Experiment concluded. |
Summary
ENVELOPE DEBEZIUM (MODE = 'TICDC')syntax for TiCDC dialect supportUpsertStyle::DebeziumJsonvariant handles envelope extraction at render time, producing[key: jsonb, data: jsonb, ...metadata]output schemaMotivation
Enables ingestion from CDC systems like TiCDC that emit Debezium-format JSON rather than Avro.
Syntax
Changes
sql-parser):SourceEnvelope::Debeziumis now a struct variant with optionalDebeziumModesql): JSON encoding routes toUpsertStyle::DebeziumJson; error message updated to "AVRO or JSON"storage-types): NewDebeziumJsonModeenum andUpsertStyle::DebeziumJsonvariant with descriptor transformstorage):extract_debezium_json()parses Jsonb datum to extractop/afterfields, supports payload-wrapped formatcatalog): Exhaustive match updated for new variantTest plan
mz-sql-parser,mz-storage-types,mz-sql,mz-storage,mz-catalogall passsubscribe_outputs.sltpasses (SUBSCRIBE DEBEZIUM unaffected)kafka-debezium-json-sources.td: CRUD, snapshot, payload-wrapped, INCLUDE OFFSET, error caseskafka-debezium-json-ticdc-sources.td: CRUD, delete, reinsertion, MODE syntax, invalid modekafka-upsert-debezium-sources.tdupdated and passes (error message change)🤖 Generated with Claude Code
Co-Authored-By: Claude Opus 4.6 noreply@anthropic.com