Skip to content

storage: Add Debezium JSON envelope support for Kafka sources#35342

Closed
bosconi wants to merge 5 commits intomainfrom
jc/debezium-json-bmad
Closed

storage: Add Debezium JSON envelope support for Kafka sources#35342
bosconi wants to merge 5 commits intomainfrom
jc/debezium-json-bmad

Conversation

@bosconi
Copy link
Copy Markdown
Member

@bosconi bosconi commented Mar 6, 2026

Summary

  • Adds support for JSON-encoded Debezium envelope messages on Kafka sources, previously limited to Avro only
  • Introduces ENVELOPE DEBEZIUM (MODE = 'TICDC') syntax for TiCDC dialect support
  • New UpsertStyle::DebeziumJson variant handles envelope extraction at render time, producing [key: jsonb, data: jsonb, ...metadata] output schema

Motivation

Enables ingestion from CDC systems like TiCDC that emit Debezium-format JSON rather than Avro.

Syntax

-- Generic Debezium JSON
CREATE TABLE t FROM SOURCE s (REFERENCE "topic")
  KEY FORMAT JSON VALUE FORMAT JSON
  ENVELOPE DEBEZIUM;

-- TiCDC dialect
CREATE TABLE t FROM SOURCE s (REFERENCE "topic")
  KEY FORMAT JSON VALUE FORMAT JSON
  ENVELOPE DEBEZIUM (MODE = 'TICDC');

Changes

  • Parser (sql-parser): SourceEnvelope::Debezium is now a struct variant with optional DebeziumMode
  • Planning (sql): JSON encoding routes to UpsertStyle::DebeziumJson; error message updated to "AVRO or JSON"
  • Storage types (storage-types): New DebeziumJsonMode enum and UpsertStyle::DebeziumJson variant with descriptor transform
  • Render (storage): extract_debezium_json() parses Jsonb datum to extract op/after fields, supports payload-wrapped format
  • Catalog (catalog): Exhaustive match updated for new variant

Test plan

  • Unit tests: mz-sql-parser, mz-storage-types, mz-sql, mz-storage, mz-catalog all pass
  • Parser datadriven tests rewritten for struct variant
  • SLT subscribe_outputs.slt passes (SUBSCRIBE DEBEZIUM unaffected)
  • Testdrive kafka-debezium-json-sources.td: CRUD, snapshot, payload-wrapped, INCLUDE OFFSET, error cases
  • Testdrive kafka-debezium-json-ticdc-sources.td: CRUD, delete, reinsertion, MODE syntax, invalid mode
  • Existing kafka-upsert-debezium-sources.td updated and passes (error message change)

🤖 Generated with Claude Code

Co-Authored-By: Claude Opus 4.6 noreply@anthropic.com

Previously, the Debezium envelope only supported Avro encoding. This
adds support for JSON-encoded Debezium messages, enabling ingestion
from systems like TiCDC that emit Debezium-format JSON.

The new `UpsertStyle::DebeziumJson` variant handles envelope extraction
at render time by parsing the Jsonb datum to extract `op` and `after`
fields. The output schema is `[key: jsonb, data: jsonb, ...metadata]`,
where `key` is the decoded Kafka message key and `data` is the extracted
`after` payload.

Syntax:
  KEY FORMAT JSON VALUE FORMAT JSON ENVELOPE DEBEZIUM
  KEY FORMAT JSON VALUE FORMAT JSON ENVELOPE DEBEZIUM (MODE = 'TICDC')

The optional MODE parameter selects dialect-specific handling (currently
TiCDC vs generic). Both modes support the standard Debezium operations:
create (c), update (u), read/snapshot (r), delete (d), and truncate (t).
Payload-wrapped format (schema+payload) is also handled transparently.

Tests added:
- test/testdrive/kafka-debezium-json-sources.td (generic Debezium JSON)
- test/testdrive/kafka-debezium-json-ticdc-sources.td (TiCDC mode)
- Updated kafka-upsert-debezium-sources.td error message test

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@github-actions
Copy link
Copy Markdown

github-actions bot commented Mar 6, 2026

Thanks for opening this PR! Here are a few tips to help make the review process smooth for everyone.

PR title guidelines

  • Use imperative mood: "Fix X" not "Fixed X" or "Fixes X"
  • Be specific: "Fix panic in catalog sync when controller restarts" not "Fix bug" or "Update catalog code"
  • Prefix with area if helpful: compute: , storage: , adapter: , sql:

Pre-merge checklist

  • The PR title is descriptive and will make sense in the git log.
  • This PR has adequate test coverage / QA involvement has been duly considered. (trigger-ci for additional test/nightly runs)
  • If this PR includes major user-facing behavior changes, I have pinged the relevant PM to schedule a changelog post.
  • This PR has an associated up-to-date design doc, is a design doc (template), or is sufficiently small to not require a design.
  • If this PR evolves an existing $T ⇔ Proto$T mapping (possibly in a backwards-incompatible way), then it is tagged with a T-proto label.
  • If this PR will require changes to cloud orchestration or tests, there is a companion cloud PR to account for those changes that is tagged with the release-blocker label (example).

bosconi and others added 4 commits March 5, 2026 22:29
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The `envelope` jsonb column contains the full Debezium envelope,
making CDC metadata like `commit_ts`, `ts_ms`, and `source` fields
accessible for user queries. This is critical for TiCDC users who
need `commit_ts` to correlate CDC messages with Dumpling snapshots.

Output schema is now [key: jsonb, data: jsonb, envelope: jsonb, ...metadata].

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Replace the always-on `envelope` column with an opt-in mechanism
using `INCLUDE DEBEZIUM METADATA [AS alias]`, consistent with
existing INCLUDE OFFSET/PARTITION patterns.

Users who need CDC metadata (e.g., TiCDC commit_ts for Dumpling
snapshot correlation) opt in explicitly:

  CREATE TABLE ... FROM SOURCE ...
    KEY FORMAT JSON VALUE FORMAT JSON
    INCLUDE DEBEZIUM METADATA AS dbz_meta
    ENVELOPE DEBEZIUM (MODE = 'TICDC')

  SELECT dbz_meta->'source'->>'commit_ts' FROM ...

Default column name is `debezium_metadata` when no alias is given.
Validation ensures INCLUDE DEBEZIUM METADATA is only used with
ENVELOPE DEBEZIUM + VALUE FORMAT JSON.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add DD-18 documenting the opt-in envelope metadata approach via
INCLUDE DEBEZIUM METADATA [AS alias]. Update Task 2b (parser),
Task 3 (UpsertStyle variant), Task 4 (descriptor), Task 5 (planner),
Task 6 (render), and acceptance criteria (AC-7, AC-18 through AC-20).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@bosconi
Copy link
Copy Markdown
Member Author

bosconi commented Mar 28, 2026

@claude what sayest thou?

@claude
Copy link
Copy Markdown

claude bot commented Mar 28, 2026

Claude finished @bosconi's task in 4m 33s —— View job


Code Review

  • Read CLAUDE.md for repo guidelines
  • Analyze changed files
  • Review parser changes
  • Review planning/DDL changes
  • Review storage-types and render changes
  • Review tests
  • Post findings

Overall this is a solid, well-structured PR. The layered approach (parser → planner → storage-types → render) is clean, and the test coverage is good. A few issues worth addressing before merge:


Issues

1. _mode is unused — TiCDC and Generic behave identically

src/storage/src/render/sources.rs:655

fn extract_debezium_json(
    row: &Row,
    _mode: &DebeziumJsonMode,   // <-- intentionally unused
    include_envelope: bool,
) -> Result<Option<(Row, Option<Row>)>, DecodeError> {

The underscore prefix is an explicit statement that TiCDC mode is not differentiated at extraction time. If that's intentional (TiCDC uses standard Debezium format with extra fields passable via INCLUDE DEBEZIUM METADATA), that's fine, but it deserves a comment explaining why no special handling is needed rather than just silently ignoring it. If TiCDC eventually needs different behavior (e.g., different op semantics, null-key tombstone patterns), removing the underscore silently is a footgun. Consider either removing the mode parameter entirely (and the DebeziumJsonMode storage enum) or adding a // TiCDC uses standard Debezium op/after semantics; mode is reserved for future dialect-specific handling comment.

Fix this →


2. INCLUDE KEY AS alias silently ignored for JSON Debezium

src/sql/src/plan/statement/ddl.rs:1390

The planner allows INCLUDE KEY AS mykey when using JSON Debezium:

(ast::SourceEnvelope::Debezium { .. }, _, Some(DataEncoding::Json)) => {
    // JSON Debezium: key is included as a separate column, INCLUDE KEY is fine
}

But UnplannedSourceEnvelope::desc() for DebeziumJson completely ignores key_envelope and always emits a hardcoded "key": Jsonb column (src/storage-types/src/sources/envelope.rs:256). Any user alias specified via INCLUDE KEY AS mykey is silently discarded — the column always ends up as "key". This is a silent mismatch that will confuse users.

Either reject INCLUDE KEY for JSON Debezium (matching the Avro behavior conceptually, since the key is already materialized), or actually honour the alias from key_envelope.

Fix this →


3. No feature flag for new user-facing syntax

New storage envelope modes in this repo are typically gated behind a scx.require_feature_flag(...) to allow staged rollout and testing in cloud before GA. The JSON Debezium path has no flag:

Some(DataEncoding::Json) => {
    // JSON path: skip typecheck_debezium, use DebeziumJson style
    let storage_mode = match mode { ... };
    ...
    UnplannedSourceEnvelope::Upsert { style: UpsertStyle::DebeziumJson { ... } }
}

Compare with e.g. ENVELOPE MATERIALIZE which calls scx.require_feature_flag(&vars::ENABLE_ENVELOPE_MATERIALIZE)?;. Is the intent to GA this immediately? If so, the PR checklist item "major user-facing behavior changes" needs a PM ping.


Nits / Minor

src/storage/src/render/sources.rs:671 — local variable named envelope shadows the Debezium concept of "envelope":

let envelope = match json_value.get("payload") {
    Some(payload) if payload.is_object() => payload,
    _ => &json_value,
};

This is the resolved message payload, not the envelope. Naming it message or payload would reduce confusion when reading error messages below that reference envelope.

src/sql-parser/src/ast/defs/ddl.rs:645DebeziumMode::None in the AST implies absence. The storage-types version is named DebeziumJsonMode::Generic which is more descriptive. For future consistency consider DebeziumMode::Generic in the AST layer too (minor, not blocking).

src/storage/src/render/sources.rs:691tracing::warn! on every truncate event (op='t') could be noisy at scale. Consider using tracing::debug! or a rate-limited warn.

src/storage-types/src/sources/envelope.rs:257key: Jsonb.nullable(false) — is this guaranteed? If Kafka key bytes are empty, the JSON decoder emits Datum::JsonNull. That violates the non-null constraint at runtime. This may be fine in practice (empty keys are unusual for CDC), but worth a comment or a check.


Tests

The testdrive coverage is solid: CRUD, snapshot, payload-wrapped, INCLUDE OFFSET, INCLUDE DEBEZIUM METADATA with/without alias, TiCDC metadata fields (commit_ts, ts_ms), delete + re-insert, and error cases. One gap: no test for INCLUDE KEY with JSON Debezium — given issue #2 above, this path is currently broken/misleading and a test would catch it.


Pre-merge checklist item

The T-proto checklist item: UpsertStyle and SourceEnvelope are Serialize/Deserialize types used for catalog persistence. Based on the code, they appear to use serde (not protobuf) for encoding — so a T-proto label is likely not required, but worth confirming with the storage team.

@bosconi
Copy link
Copy Markdown
Member Author

bosconi commented Mar 28, 2026

Experiment concluded.

@bosconi bosconi closed this Mar 28, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant