diff --git a/doc/developer/spec/20260305_debezium_json_envelope_sources.md b/doc/developer/spec/20260305_debezium_json_envelope_sources.md new file mode 100644 index 0000000000000..20ae5dee70764 --- /dev/null +++ b/doc/developer/spec/20260305_debezium_json_envelope_sources.md @@ -0,0 +1,492 @@ +Created using https://github.com/bmad-code-org/BMAD-METHOD +--- +title: 'Debezium JSON Envelope Support for Sources' +slug: 'debezium-json-envelope-sources' +created: '2026-03-05' +status: 'ready-for-dev' +stepsCompleted: [1, 2, 3, 4] +tech_stack: [rust, kafka, json, debezium, serde_json, mz_repr, mz_interchange, mz_storage, mz_sql, mz_sql_parser, mz_storage_types] +files_to_modify: + - src/sql-parser/src/ast/defs/ddl.rs + - src/sql-parser/src/parser.rs + - src/sql/src/plan/statement/ddl.rs + - src/storage-types/src/sources/envelope.rs + - src/storage/src/render/sources.rs + - test/testdrive/kafka-debezium-json-sources.td +code_patterns: + - UpsertStyle enum variant for envelope style selection + - typecheck_debezium for planning-time validation + - upsert_commands match arm for render-time extraction + - PreDelimitedFormat::Json for JSON byte decoding + - DecodeResult { key, value, metadata } pipeline +test_patterns: + - testdrive with kafka-ingest format=bytes for JSON payloads + - key-value Kafka messages with separate key and value + - SELECT verification of final materialized output + - Insert/Update/Delete operation coverage +--- + +# Tech-Spec: Debezium JSON Envelope Support for Sources + +**Created:** 2026-03-05 + +## Overview + +### Problem Statement + +Materialize only supports Debezium with Avro-encoded messages. Users ingesting change data from TiCDC (or other Debezium-compatible producers that emit JSON rather than Avro) cannot use `ENVELOPE DEBEZIUM`. The validation in `src/sql/src/plan/statement/ddl.rs` explicitly requires `VALUE FORMAT AVRO` for Debezium envelope, blocking JSON-based CDC pipelines. + +### Solution + +Lift the Avro-only restriction on `ENVELOPE DEBEZIUM` to also accept `VALUE FORMAT JSON`. Add a JSON-based Debezium envelope decoder that extracts `before`/`after`/`op` fields from the Debezium JSON payload and maps operations to upsert semantics. Support both generic Debezium JSON and TiCDC's dialect via an extensible `MODE` parameter. + +### Scope + +**In Scope:** +- `VALUE FORMAT JSON, ENVELOPE DEBEZIUM` with `KEY FORMAT JSON` on Kafka sources +- JSON Debezium envelope parsing (extract `before`/`after`/`op` from `payload`) +- TiCDC dialect via `MODE` parameter (e.g., `ENVELOPE DEBEZIUM (MODE = 'TICDC')`) +- Insert (`c`), update (`u`), delete (`d`) operation handling +- Output columns: `key` (jsonb) and `data` (jsonb, the "after" payload) +- `INCLUDE DEBEZIUM METADATA [AS alias]` to expose the full Debezium envelope as an opt-in jsonb column for CDC metadata access (e.g., `commit_ts`, `ts_ms`, `source`) +- Support both `payload`-wrapped and flat envelope formats +- Error handling for malformed Debezium JSON messages + +**Out of Scope:** +- Schema-aware column projection from JSON payloads +- Keyless / upsert-from-payload sources +- `commit_ts`-based ordering from TiCDC source metadata (note: `commit_ts` is preserved via `INCLUDE DEBEZIUM METADATA` for user queries, but not used for ordering) +- Spatial type handling for TiCDC + +## Context for Development + +### Codebase Patterns + +**Avro Debezium Pipeline (existing):** +1. Avro decoder produces a Row with typed columns: `before` (Record, nullable), `after` (Record), `op` (String), `source` (Record) +2. `typecheck_debezium(&value_desc)` at planning time finds the column index of `after` in the RelationDesc → `after_idx` +3. Planning creates `UpsertStyle::Debezium { after_idx }` → wrapped in `UpsertEnvelope` +4. At render time, `upsert_commands()` in `sources.rs:549-560` does `row.iter().nth(after_idx)` to get `Datum::List(after)`, unpacks the record fields, appends metadata + +**JSON Decode Pipeline (existing, no Debezium):** +1. Raw bytes → `PreDelimitedFormat::Json` → `Jsonb::from_slice(bytes)` → `Row` with single `data: Jsonb` column +2. No envelope extraction — the entire JSON blob becomes the `data` column + +**Key Architectural Insight:** +For JSON + Debezium, the JSON decoder still produces a single Jsonb datum containing the ENTIRE Debezium envelope. The `typecheck_debezium()` path won't work because there's no "after" column in the RelationDesc — just a `data: Jsonb` column. Envelope extraction must happen in `upsert_commands()` via the new `UpsertStyle::DebeziumJson` variant, which parses the Jsonb value to extract `op` and `after` at render time. + +**Envelope Extraction Location:** `src/storage/src/render/sources.rs` in `upsert_commands()` — same function where Avro Debezium extraction happens. New match arm for `UpsertStyle::DebeziumJson`: +1. Take the single Jsonb datum from the decoded row +2. Parse JSON to navigate to `payload.after` (or `after` for flat format) and `payload.op` (or `op`) +3. For op `d` → return `None` (delete retraction) +4. For op `c`/`u` → re-serialize `after` value as Jsonb datum, return as the row + +**Error Handling Pattern:** Existing upsert sources surface decode errors through the source's error collection (see `DecodeResult` with `value: Option>`). Malformed Debezium JSON should produce `Err(DecodeError)` values through the same mechanism. + +### Files to Reference + +| File | Purpose | +| ---- | ------- | +| `src/sql-parser/src/ast/defs/ddl.rs:636-644` | `SourceEnvelope` AST enum — `Debezium` variant needs MODE parameter | +| `src/sql-parser/src/parser.rs:2362-2398` | `parse_source_envelope()` — parse MODE option after DEBEZIUM keyword | +| `src/sql/src/plan/statement/ddl.rs:1394-1412` | Debezium planning — lift Avro-only gate, handle JSON path | +| `src/sql/src/plan/statement/ddl.rs:2212-2229` | `typecheck_debezium()` — Avro-specific, bypass for JSON | +| `src/sql/src/plan/statement/ddl.rs:2247-2255` | KEY FORMAT requirement validation — already covers Debezium | +| `src/storage-types/src/sources/envelope.rs:70-84` | `UpsertStyle` enum — add `DebeziumJson` variant | +| `src/storage-types/src/sources/envelope.rs:230-255` | `UnplannedSourceEnvelope` desc transform — add JSON path | +| `src/storage-types/src/sources/encoding.rs:92-100` | `DataEncoding` enum — `Json` variant already exists | +| `src/storage/src/decode.rs:208-227` | `PreDelimitedFormat::Json` — no changes needed, raw JSON decode stays as-is | +| `src/storage/src/render/sources.rs:549-560` | `upsert_commands()` Debezium match arm — add DebeziumJson arm | +| `src/interchange/src/envelopes.rs:141-165` | `dbz_envelope()` / `dbz_format()` — reference for envelope structure | +| `test/testdrive/kafka-upsert-debezium-sources.td` | Existing Avro Debezium test — pattern to follow for JSON tests | + +### Technical Decisions + +**DD-1: Single jsonb output column** +- **Decision:** The decoded "after" payload is emitted as a single `jsonb` column (consistent with how `VALUE FORMAT JSON` works today without Debezium). +- **Alternative considered:** Schema-aware column projection where users define typed columns in the CREATE SOURCE statement. This would require JSON-to-datum type coercion and schema validation. Deferred as a future enhancement. +- **Note:** This pushes schema validation entirely to query time. Users will use `data->>'field_name'` for column access with no type checking at ingestion. Acceptable for V1. + +**DD-2: Require KEY FORMAT JSON** +- **Decision:** `KEY FORMAT JSON` is required for Debezium JSON sources, consistent with how Debezium Avro requires a key schema. +- **Open question:** Whether keyless sources with upsert-from-payload should be supported. To be revisited. + +**DD-3: Kafka offset ordering (not commit_ts)** +- **Decision:** Use Kafka offset ordering for message sequencing, same as existing Debezium Avro sources. +- **Open question:** Whether TiCDC's `commit_ts` should be used for ordering to better reflect TiDB's distributed transaction semantics. To be revisited. + +**DD-4: Extensible MODE parameter for dialect selection** +- **Decision:** Support both generic Debezium JSON and TiCDC's flavor via an extensible `MODE` parameter on the `ENVELOPE DEBEZIUM` clause. Default mode is generic Debezium. `MODE = 'TICDC'` selects TiCDC-specific handling (e.g., base64 binary encoding, float64 decimals). +- **Rationale (Party Mode):** A `MODE` parameter is more extensible than a boolean toggle. Future dialects (Maxwell, Canal, etc.) can be added without breaking syntax. + +**DD-5: New UpsertStyle::DebeziumJson variant** +- **Decision:** Introduce a new `UpsertStyle::DebeziumJson` variant rather than reusing the existing `UpsertStyle::Debezium { after_idx }`. The Avro variant's `after_idx` is schema-index-based and not applicable to JSON, which extracts `payload.after` by key name from the Jsonb datum at render time. +- **Rationale (Party Mode):** Keeps Avro and JSON code paths cleanly separated, avoids overloading the existing variant with conditional logic. + +**DD-6: Support both payload-wrapped and flat envelope formats** +- **Decision:** The decoder handles both `{ "payload": { "before": ..., "after": ..., "op": ... } }` (standard Debezium with schema) and flat `{ "before": ..., "after": ..., "op": ... }` (Debezium with `debezium-disable-schema=true` or similar configurations). +- **Rationale (Party Mode):** Some Debezium configurations strip the `payload` wrapper. Supporting both avoids a common production gotcha. + +**DD-7: Error handling for malformed messages** +- **Decision:** Malformed Debezium JSON messages (missing `op`, unexpected `op` values, null `after` on insert, unparseable JSON) should be handled consistently with Materialize's existing error policy for upsert sources — surface errors through the source's error collection (`DecodeError`) rather than silently dropping records. +- **Rationale (Party Mode):** JSON is untyped, so malformed messages are a high-risk area. Explicit error handling must be in scope, not deferred. + +**DD-8: Envelope extraction in upsert_commands (same location as Avro)** +- **Decision:** JSON Debezium envelope extraction happens in `upsert_commands()` in `src/storage/src/render/sources.rs`, the same function where Avro Debezium extraction occurs. The new `UpsertStyle::DebeziumJson` match arm parses the Jsonb datum to extract `op` and `after`, rather than using schema-based column indexing. +- **Rationale:** Keeps all envelope extraction logic co-located. The JSON path parses at render time (slightly different from Avro's index-based approach) but the upsert semantics (key + value → insert/update/delete) remain identical. + +**DD-9: Include key as separate output column (resolves key_indices correctness) [F1, F3, F5, F6]** +- **Decision:** Unlike Avro Debezium (which embeds key fields inside the unpacked `after` Record), JSON Debezium includes the key as a separate `key: Jsonb` column prepended to the output row. The base output schema is `[key: Jsonb, data: Jsonb, ...metadata]`. `key_indices` points to position `[0]` (the key column). This is necessary because: (a) `upsert_core` uses `UpsertKey::from_value(value_ref, &key_indices)` to reconstruct keys from persisted values during rehydration — empty/wrong `key_indices` would cause data corruption after restart; (b) the `data: Jsonb` column is opaque so `match_key_indices` cannot find named key fields within it; (c) users need access to key fields since they can't be extracted from the opaque `after` payload without knowing the schema. +- **Consequence:** `INCLUDE KEY` rejection logic (which fires for Avro Debezium because "Debezium values include all keys") must NOT fire for JSON Debezium — the key is always included as a separate column. Update the validation at `ddl.rs:1372-1384` accordingly. +- **To revisit:** Whether the key column should be named `key` (single Jsonb blob) or flattened into multiple columns if the key JSON has multiple top-level fields. + +**DD-10: Parse-extract-reserialize performance cost [F12]** +- **Decision:** Every JSON Debezium message is parsed from `Jsonb` → `serde_json::Value`, the `after` field is extracted, and then re-serialized back to `Jsonb`. This is inherent to the approach of extracting envelope fields from an opaque JSON blob at render time. +- **Risk:** Additional CPU cost per message compared to Avro's index-based extraction. For V1 this is acceptable — JSON sources are already doing full JSON parsing in the decode layer. +- **Mitigation:** Profile under realistic throughput before GA. A future optimization could parse the raw bytes directly in the decode layer (before Jsonb conversion) to avoid the double-parse, but this would move envelope extraction out of `upsert_commands()` and diverge from DD-8. +- **To revisit:** Establish throughput benchmarks before shipping to production. + +**DD-11: No ingestion-time schema validation for JSON [F12]** +- **Decision:** Unlike Avro Debezium (which validates `before`/`after` column types at planning time via `typecheck_debezium()`), JSON Debezium performs no structural validation at planning time. The value is an opaque `Jsonb` blob. Malformed `after` payloads (e.g., unexpected nested structures, wrong types) are not caught until query time when users access fields via `->>`/`->>` operators. +- **Risk:** Users may not discover data quality issues until downstream queries fail or return unexpected results. +- **Mitigation:** Envelope-level validation (missing `op`, null `after` on insert) is caught at render time per DD-7. Payload-content validation is deferred to future schema-aware column projection work (out of scope). + +**DD-12: Handle `"r"` (read/snapshot) operation as insert [F2]** +- **Decision:** Debezium emits `op: "r"` for snapshot records (initial table scan). TiCDC and standard Debezium both produce these during initial sync. Treat `"r"` identically to `"c"` (create) — extract the `after` field and upsert the row. +- **Rationale:** Rejecting `"r"` would make any source unusable on tables that weren't empty when snapshotting began. +- **To revisit:** Whether snapshot-specific metadata should be exposed or handled differently. + +**DD-13: Silently skip `"t"` (truncate) operations [F8]** +- **Decision:** Debezium can emit `op: "t"` for truncate events. These are silently skipped (not errored) because Materialize has no mechanism to bulk-retract all rows for a key range from within the upsert pipeline. A warning-level log message is emitted when a truncate event is encountered. +- **To revisit:** Whether truncate support should be added in a future iteration (e.g., by retracting all known keys for the source). + +**DD-14: Null Kafka message values treated as deletes [F9]** +- **Decision:** On Kafka compacted topics, null-value records serve as tombstones. When the entire Kafka message value is null (not a JSON object with `op: "d"`, but literally absent), treat it as a delete for the given key. This is consistent with how `ENVELOPE UPSERT` handles null values in the existing decode pipeline (`result.value: None` at the `upsert_commands` level already maps to retraction). +- **Rationale:** This requires no new code — the existing `result.value: None` path in `upsert_commands` already handles this before the `UpsertStyle` match arm is reached. + +**DD-15: TiCDC MODE is structural-only in V1 [F10]** +- **Decision:** The `MODE = 'TICDC'` parameter is parsed, stored, and round-trips through SQL display, but V1 runtime behavior is identical to generic mode. This establishes the syntax and plumbing for future TiCDC-specific handling (base64 binary, float64 decimals) without shipping dead behavioral code. +- **Rationale:** The parsing/storage cost is minimal and avoids a breaking syntax change later. The test for TiCDC mode (Task 8) verifies the plumbing works. +- **To revisit:** Define concrete V1 behavioral differences or defer MODE entirely if the plumbing cost is deemed not worth it during implementation. + +**DD-16: Multi-partition ordering is a known constraint [F11]** +- **Decision:** For multi-partition Kafka topics, updates for the same key on different partitions can arrive out of order. This is the same limitation as existing Avro Debezium sources and is inherent to Kafka's per-partition ordering guarantee. TiCDC distributes events across partitions, so this is particularly relevant for the TiCDC use case. +- **Known constraint:** Users must ensure same-key events are routed to the same partition (standard Kafka key-based partitioning) for correct ordering. +- **To revisit:** Whether `commit_ts`-based ordering (DD-3) could resolve cross-partition ordering for TiCDC. + +**DD-18: Opt-in envelope metadata via INCLUDE DEBEZIUM METADATA** +- **Decision:** The full Debezium envelope (including `op`, `source`, `ts_ms`, `before`, `after`, `transaction`, etc.) is exposed as an opt-in jsonb column via `INCLUDE DEBEZIUM METADATA [AS alias]`. Default column name is `debezium_metadata` when no alias is given. +- **Rationale:** Users need CDC metadata like `commit_ts` (for TiCDC/Dumpling snapshot correlation) and `ts_ms`. Rather than always including this data (which duplicates the `after` payload and wastes storage), the `INCLUDE` mechanism follows the existing pattern of `INCLUDE OFFSET`, `INCLUDE PARTITION`, etc. — opt-in, user-named columns for metadata. +- **Alternatives considered:** (A) Granular `INCLUDE DEBEZIUM SOURCE`, `INCLUDE DEBEZIUM TIMESTAMP` — more parser work for questionable benefit since users can use `->` on a single jsonb column. (B) Magic column names like `__debezium_source` — implicit and surprising. (C) Always-on `envelope` column — wastes storage when not needed, inconsistent with INCLUDE pattern. +- **Validation:** `INCLUDE DEBEZIUM METADATA` is only valid with `ENVELOPE DEBEZIUM` + `VALUE FORMAT JSON`. Using it with other envelopes produces a planning error. +- **Implementation:** The column name is stored in `UpsertStyle::DebeziumJson { envelope_column: Option }`. At render time, `extract_debezium_json()` conditionally serializes the full envelope only when `envelope_column.is_some()`. + +**DD-17: Pseudocode is illustrative, not compilable [F7, F13]** +- **Decision:** All pseudocode in this spec (particularly `extract_debezium_json` and the `upsert_commands` match arm) is illustrative of the intended logic, not compilable Rust. Implementation must handle Rust-specific concerns: (a) `Datum::Jsonb` wraps a borrowed `JsonbRef<'a>` — extracting and re-packing requires allocating into a `Row` via `RowPacker`; (b) the nested match structure in Task 5 should be simplified during implementation; (c) error types must use the concrete `DecodeErrorKind` variants available in the codebase. +- **Rationale:** Spec pseudocode communicates intent. The implementing developer is expected to adapt to Rust's ownership/lifetime model. + +## Implementation Plan + +### Tasks + +- [ ] **Task 1: Add MODE parameter to SourceEnvelope AST** + - File: `src/sql-parser/src/ast/defs/ddl.rs` + - Action: Extend `SourceEnvelope::Debezium` from a unit variant to a struct variant with an optional `mode` field. Define a `DebeziumMode` enum with variants `None` (generic) and `TiCdc`. + ```rust + pub enum DebeziumMode { None, TiCdc } + pub enum SourceEnvelope { + // ... + Debezium { mode: DebeziumMode }, + // ... + } + ``` + - Action: Update the `AstDisplay` impl for `SourceEnvelope::Debezium` to render `DEBEZIUM` or `DEBEZIUM (MODE = 'TICDC')`. + - Action: Update `requires_all_input()` match arm accordingly. + - Notes: All existing references to `SourceEnvelope::Debezium` across the codebase will need pattern updates (e.g., `SourceEnvelope::Debezium { .. }`). Search for all match arms. + +- [ ] **Task 2: Parse MODE option in parse_source_envelope** + - File: `src/sql-parser/src/parser.rs` + - Action: In `parse_source_envelope()` (line ~2365), after matching the `DEBEZIUM` keyword, optionally parse `(MODE = '')`. If present, map `'TICDC'` to `DebeziumMode::TiCdc`. If absent, default to `DebeziumMode::None`. + ```rust + } else if self.parse_keyword(DEBEZIUM) { + let mode = if self.consume_token(&Token::LParen) { + self.expect_keyword(MODE)?; + self.expect_token(&Token::Eq)?; + let mode_str = self.parse_literal_string()?; + self.expect_token(&Token::RParen)?; + match mode_str.to_uppercase().as_str() { + "TICDC" => DebeziumMode::TiCdc, + other => return Err(/* unsupported mode error */), + } + } else { + DebeziumMode::None + }; + SourceEnvelope::Debezium { mode } + } + ``` + - Notes: The `MODE` keyword may need to be added to the keyword list if not already present. + +- [ ] **Task 2b: Add INCLUDE DEBEZIUM METADATA to parser** + - File: `src/sql-parser/src/parser.rs`, `src/sql-parser/src/ast/defs/ddl.rs` + - Action: Add `DebeziumMetadata { alias: Option }` variant to `SourceIncludeMetadata` enum. In `parse_source_include_metadata()`, add `DEBEZIUM` to the keyword list; when matched, expect `METADATA` keyword, then parse optional alias. Update `AstDisplay` for the new variant. + - Notes: The two-keyword sequence `DEBEZIUM METADATA` avoids ambiguity with the existing `DEBEZIUM` keyword used in envelope parsing. + +- [ ] **Task 3: Add UpsertStyle::DebeziumJson variant** + - File: `src/storage-types/src/sources/envelope.rs` + - Action: Add a new variant to `UpsertStyle`: + ```rust + pub enum UpsertStyle { + Default(KeyEnvelope), + Debezium { after_idx: usize }, + DebeziumJson { mode: DebeziumJsonMode, envelope_column: Option }, + ValueErrInline { key_envelope: KeyEnvelope, error_column: String }, + } + ``` + - Action: Define `DebeziumJsonMode` enum (or reuse from AST layer via a storage-types equivalent): + ```rust + pub enum DebeziumJsonMode { Generic, TiCdc } + ``` + - Notes: `envelope_column` is `Some(name)` when `INCLUDE DEBEZIUM METADATA [AS name]` is specified, `None` otherwise. See DD-18. + - Notes: Must derive `Clone, Debug, Serialize, Deserialize, Eq, PartialEq` to match existing variants. + +- [ ] **Task 4: Add UnplannedSourceEnvelope descriptor transform for DebeziumJson** + - File: `src/storage-types/src/sources/envelope.rs` + - Action: In the `UnplannedSourceEnvelope` match that builds the output `RelationDesc` (line ~230), add a new arm for `UpsertStyle::DebeziumJson`. Per DD-9, the base output schema is `[key: Jsonb, data: Jsonb, ...metadata]`. If `INCLUDE DEBEZIUM METADATA` is specified, an additional jsonb column is added between `data` and metadata: + ```rust + UnplannedSourceEnvelope::Upsert { + style: UpsertStyle::DebeziumJson { envelope_column, .. }, + } => { + let key_desc = RelationDesc::builder() + .with_column("key", ScalarType::Jsonb.nullable(false)) + .finish(); + let mut value_builder = RelationDesc::builder() + .with_column("data", ScalarType::Jsonb.nullable(false)); + if let Some(col_name) = envelope_column { + value_builder = value_builder + .with_column(col_name, ScalarType::Jsonb.nullable(false)); + } + let value_desc = value_builder.finish(); + let key_indices = vec![0usize]; + let desc = key_desc.with_key(key_indices.clone()) + .concat(value_desc).concat(metadata_desc); + ( + self.into_source_envelope(Some(key_indices), None, Some(desc.arity())), + desc, + ) + } + ``` + - Notes: This approach ensures `upsert_core` can reconstruct keys from persisted values during rehydration via `UpsertKey::from_value(value_ref, &key_indices)`. The key column at index 0 contains the JSON key blob. Skip `match_key_indices` entirely for this path. + +- [ ] **Task 5: Lift Avro-only gate in DDL planning** + - File: `src/sql/src/plan/statement/ddl.rs` + - Action: Modify the `ast::SourceEnvelope::Debezium` match arm (lines 1394-1412) to handle JSON encoding separately: + ```rust + ast::SourceEnvelope::Debezium { mode } => { + match encoding.as_ref().map(|e| &e.value) { + Some(DataEncoding::Json) => { + // JSON path: skip typecheck_debezium, use DebeziumJson style + let storage_mode = match mode { + DebeziumMode::TiCdc => DebeziumJsonMode::TiCdc, + DebeziumMode::None => DebeziumJsonMode::Generic, + }; + UnplannedSourceEnvelope::Upsert { + style: UpsertStyle::DebeziumJson { mode: storage_mode }, + } + } + _ => { + // Existing Avro path (unchanged) + let after_idx = match typecheck_debezium(&value_desc) { + Ok((_before_idx, after_idx)) => Ok(after_idx), + Err(type_err) => match encoding.as_ref().map(|e| &e.value) { + Some(DataEncoding::Avro(_)) => Err(type_err), + _ => Err(sql_err!( + "ENVELOPE DEBEZIUM requires that VALUE FORMAT is set to AVRO or JSON" + )), + }, + }?; + UnplannedSourceEnvelope::Upsert { + style: UpsertStyle::Debezium { after_idx }, + } + } + } + } + ``` + - Action: Update all other match arms referencing `ast::SourceEnvelope::Debezium` to use `ast::SourceEnvelope::Debezium { .. }` pattern (lines 810, 1372-1384, and any others found by the compiler). + - Action: Per DD-9, update `INCLUDE KEY` rejection logic at `ddl.rs:1372-1384` — the "Cannot use INCLUDE KEY with ENVELOPE DEBEZIUM" error must NOT fire when `DataEncoding::Json` is used. For JSON Debezium, key is always included as a separate column. + - Action: Extract `INCLUDE DEBEZIUM METADATA` from `include_metadata`, pass column name to `UpsertStyle::DebeziumJson { envelope_column }`. Filter `DebeziumMetadata` items out of Kafka metadata processing (they're not Kafka metadata). Add validation: `INCLUDE DEBEZIUM METADATA` requires `ENVELOPE DEBEZIUM` + `VALUE FORMAT JSON`. See DD-18. + - Notes: The error message changes from "AVRO" to "AVRO or JSON". KEY FORMAT validation at line 2247 already covers Debezium and requires no changes. + +- [ ] **Task 6: Implement DebeziumJson extraction in upsert_commands** + - File: `src/storage/src/render/sources.rs` + - Action: Add a new match arm in `upsert_commands()` alongside the existing `UpsertStyle::Debezium { after_idx }` arm (line ~549): + ```rust + UpsertStyle::DebeziumJson { ref mode, ref envelope_column } => { + match extract_debezium_json(row, mode, envelope_column.is_some()) { + Ok(Some((after_row, envelope_row))) => { + let mut packer = row_buf.packer(); + packer.extend_by_row(&key_row); // key at position 0 + packer.extend_by_row(&after_row); // data at position 1 + if let Some(env_row) = envelope_row { // envelope (opt-in via DD-18) + packer.extend_by_row(&env_row); + } + packer.extend_by_row(&metadata); + Some(Ok(row_buf.clone())) + } + Ok(None) => None, // Delete operation + Err(e) => Some(Err(DecodeError { kind: e, raw: vec![] })), + } + } + ``` + - Action: Implement `extract_debezium_json()` helper function (in same file or in `src/interchange/src/json.rs`). Per DD-12/DD-13/DD-17/DD-18, pseudocode is illustrative: + ```rust + fn extract_debezium_json( + row: &Row, + mode: &DebeziumJsonMode, + include_envelope: bool, + ) -> Result)>, DecodeError> { + // 1. Convert Jsonb datum to serde_json::Value via JsonbRef::from_datum() + // 2. Check for "payload" wrapper; if present, unwrap + // 3. Extract "op" field: + // - "c" or "r" (per DD-12: snapshot reads treated as inserts) → extract after + // - "u" → extract after + // - "d" → return Ok(None) + // - "t" → log warning, return Ok(None) per DD-13 + // - missing/other → return Err(DecodeError) + // 4. Extract "after" field → must be non-null for c/r/u, else Err + // 5. Serialize "after" back into a Row containing a single Jsonb datum + // 6. If include_envelope, serialize full envelope into a separate Row (DD-18) + // 7. Return Ok(Some((after_row, envelope_row))) + } + ``` + - Action: Also update the key_row match arm (line ~517) to include `UpsertStyle::DebeziumJson { .. }` alongside `UpsertStyle::Debezium { .. }` for key handling. + - Notes: Per DD-17, `Datum::Jsonb` wraps a borrowed `JsonbRef<'a>` — extracting and re-packing requires allocating into a new `Row` via `RowPacker`. Per DD-14, null Kafka values are already handled as deletes before this match arm is reached. + +- [ ] **Task 7: Create testdrive tests for Debezium JSON sources** + - File: `test/testdrive/kafka-debezium-json-sources.td` (new file) + - Action: Create comprehensive testdrive test covering: + - Source creation with `KEY FORMAT JSON, VALUE FORMAT JSON, ENVELOPE DEBEZIUM` + - Insert operation (op="c"): publish JSON with `after` set, `before` null → verify row appears + - Update operation (op="u"): publish JSON with both `before` and `after` → verify row updated + - Delete operation (op="d"): publish JSON with `before` set, `after` null → verify row retracted + - Payload-wrapped format: `{ "payload": { "op": "c", "after": {...} } }` + - Flat format: `{ "op": "c", "after": {...} }` + - Compound key: multiple fields in key JSON + - Error case: malformed message (missing `op` field) + - Notes: Use `kafka-ingest format=bytes` for raw JSON messages (not Avro). Follow patterns from `kafka-upsert-debezium-sources.td` for structure. Key and value are separate JSON blobs sent as raw bytes. + +- [ ] **Task 8: Create testdrive tests for TiCDC mode** + - File: `test/testdrive/kafka-debezium-json-ticdc-sources.td` (new file) + - Action: Create tests specifically for TiCDC dialect: + - Source creation with `ENVELOPE DEBEZIUM (MODE = 'TICDC')` + - TiCDC-formatted messages with `commit_ts` and `cluster_id` in source metadata + - Insert/Update/Delete operations with TiCDC envelope structure + - Notes: TiCDC messages have the same `payload.before/after/op` structure as generic Debezium. The MODE parameter is parsed and stored but V1 behavior differences are minimal — this test establishes the pattern for future TiCDC-specific handling. + +### Acceptance Criteria + +**Happy Path:** +- [ ] AC-1: Given a Kafka source with `KEY FORMAT JSON, VALUE FORMAT JSON, ENVELOPE DEBEZIUM`, when a JSON message with `"op": "c"` and a non-null `"after"` field is published, then a row appears in the source with `key` column containing the JSON key and `data` column containing the `after` JSON object. +- [ ] AC-2: Given an existing row from AC-1, when a JSON message with `"op": "u"`, matching key, and updated `"after"` is published, then the row's `data` column reflects the new `after` value. +- [ ] AC-3: Given an existing row from AC-1, when a JSON message with `"op": "d"` and null `"after"` is published, then the row is retracted from the source. +- [ ] AC-4: Given a Debezium JSON message wrapped in a `"payload"` object (e.g., `{ "payload": { "op": "c", "after": {...} } }`), when ingested, then envelope extraction succeeds identically to flat format. +- [ ] AC-5: Given a Debezium JSON message in flat format (e.g., `{ "op": "c", "after": {...} }`), when ingested, then envelope extraction succeeds. +- [ ] AC-6: Given a Debezium JSON message with `"op": "r"` (snapshot/read), when ingested, then it is treated as an insert (same as `"c"`). [DD-12] + +**TiCDC Mode:** +- [ ] AC-7: Given `ENVELOPE DEBEZIUM (MODE = 'TICDC')` with `INCLUDE DEBEZIUM METADATA AS dbz_meta` in the CREATE TABLE statement, when a TiCDC-formatted JSON message is published, then the source processes it correctly. The full Debezium envelope (including `commit_ts`, `cluster_id`, and other TiCDC metadata) is preserved in the `dbz_meta` column for user queries (e.g., `dbz_meta->'source'->>'commit_ts'`). + +**Error Handling:** +- [ ] AC-8: Given a JSON message with a missing `"op"` field, when ingested, then a decode error is surfaced through the source's error collection (not silently dropped). +- [ ] AC-9: Given a JSON message with an unrecognized `"op"` value (e.g., `"op": "x"`), when ingested, then a decode error is surfaced. +- [ ] AC-10: Given a JSON message with `"op": "c"` but null `"after"`, when ingested, then a decode error is surfaced. +- [ ] AC-11: Given a Debezium JSON message with `"op": "t"` (truncate), when ingested, then it is silently skipped (not errored). [DD-13] +- [ ] AC-12: Given a null Kafka message value (tombstone), when ingested, then it is treated as a delete for the given key. [DD-14] + +**INCLUDE DEBEZIUM METADATA:** +- [ ] AC-18: Given `INCLUDE DEBEZIUM METADATA AS dbz_envelope` with `KEY FORMAT JSON VALUE FORMAT JSON ENVELOPE DEBEZIUM`, when a message with `source`, `ts_ms`, and `op` fields is published, then the `dbz_envelope` column contains the full Debezium envelope as jsonb. +- [ ] AC-19: Given `INCLUDE DEBEZIUM METADATA` (no alias) with `ENVELOPE DEBEZIUM`, when the table is created, then the column is named `debezium_metadata`. +- [ ] AC-20: Given `INCLUDE DEBEZIUM METADATA` with `ENVELOPE UPSERT` (not DEBEZIUM), when CREATE TABLE is executed, then it fails with an error requiring ENVELOPE DEBEZIUM with VALUE FORMAT JSON. + +**SQL Validation:** +- [ ] AC-13: Given `VALUE FORMAT JSON, ENVELOPE DEBEZIUM` without `KEY FORMAT`, when CREATE SOURCE is executed, then it fails with an error requiring KEY FORMAT. +- [ ] AC-14: Given `VALUE FORMAT TEXT, ENVELOPE DEBEZIUM`, when CREATE SOURCE is executed, then it fails with an error requiring AVRO or JSON format. +- [ ] AC-15: Given `VALUE FORMAT JSON, ENVELOPE DEBEZIUM` with `KEY FORMAT JSON`, when CREATE SOURCE is executed, then it succeeds. + +**Restart/Rehydration [F4]:** +- [ ] AC-16: Given a running Debezium JSON source with ingested data, when the source is restarted (cluster restart or rehydration from persist), then all previously ingested rows are correctly restored with matching key and data values. + +**Regression:** +- [ ] AC-17: Given existing `VALUE FORMAT AVRO, ENVELOPE DEBEZIUM` sources, when the same Avro Debezium tests are run, then behavior is unchanged (no regressions). + +## Additional Context + +### Dependencies + +- `serde_json` — already in the dependency tree, needed for parsing Debezium JSON envelope fields in `upsert_commands()` +- `mz_repr::adt::jsonb` — existing Jsonb type used for the output datum +- No new external crate dependencies required + +### Testing Strategy + +**Testdrive (integration):** +- `test/testdrive/kafka-debezium-json-sources.td` — primary test file covering generic Debezium JSON: insert, update, delete, payload-wrapped, flat, INCLUDE OFFSET, INCLUDE DEBEZIUM METADATA (with alias and default name), error cases +- `test/testdrive/kafka-debezium-json-ticdc-sources.td` — TiCDC dialect mode tests with INCLUDE DEBEZIUM METADATA to verify `commit_ts` access + +**Regression:** +- Run existing `test/testdrive/kafka-upsert-debezium-sources.td` and related Avro Debezium tests to ensure no breakage from the `SourceEnvelope::Debezium` AST change + +**Unit tests (if applicable):** +- `extract_debezium_json()` helper function: test payload-wrapped vs flat, all op types, malformed inputs +- Can be added inline in the implementing module with `#[cfg(test)]` block + +**Manual verification:** +- Create a Kafka topic, publish raw JSON Debezium messages via `kcat` or similar, create source in Materialize, verify `SELECT * FROM source_table` returns expected results + +### Notes + +- All identified risks are captured as design decisions DD-9 through DD-18. +- TiCDC MODE is parsed and stored but V1 behavior is identical to generic mode (DD-15). +- Multi-partition ordering is a known constraint documented in DD-16. +- CDC metadata (e.g., `commit_ts`, `ts_ms`) is accessible via opt-in `INCLUDE DEBEZIUM METADATA` (DD-18). + +**Future considerations (out of scope):** +- Schema-aware column projection (`CREATE SOURCE ... (id int, name text) FORMAT JSON ENVELOPE DEBEZIUM`) +- Keyless upsert where key is derived from the Debezium payload +- `commit_ts`-based ordering for TiCDC (the field is already preserved via `INCLUDE DEBEZIUM METADATA`; future work is using it for message ordering) +- Additional MODE values: Maxwell, Canal, etc. +- Truncate operation support (DD-13) + +--- + +## Party Mode Changelog + +Changes made during Party Mode review (Step 1): + +| # | Change | Source | Section | +|---|--------|--------|---------| +| 1 | Added `MODE` parameter syntax instead of boolean toggle for dialect selection | Winston (Architect) | DD-4, Scope | +| 2 | Added `UpsertStyle::DebeziumJson` as new variant (not reusing Avro's `after_idx`) | Amelia (Developer) | DD-5 (new) | +| 3 | Added support for both `payload`-wrapped and flat envelope formats | Winston (Architect) | DD-6 (new), Scope | +| 4 | Added error handling for malformed messages as in-scope requirement | John (PM) + Murat (Test Architect) | DD-7 (new), Scope | +| 5 | Added note about schema validation being deferred to query time | Winston (Architect) | DD-1 | +| 6 | Refined solution description to reference extensible MODE parameter | All | Solution | + +## Adversarial Review Changelog + +Findings from adversarial review, resolved as design decisions: + +| Finding | Severity | Resolution | Design Decision | +|---------|----------|------------|-----------------| +| F1: key_indices correctness unresolved | Critical | Include key as separate output column; key_indices=[0] | DD-9 | +| F2: `"r"` (snapshot) op type ignored | Critical | Treat "r" as insert, same as "c" | DD-12 | +| F3: INCLUDE KEY broken for JSON Debezium | Critical | Key always included as separate column; skip INCLUDE KEY rejection | DD-9 | +| F4: No restart/rehydration AC | High | Added AC-16 for restart correctness | AC-16 | +| F5: Task 4 pseudocode self-contradictory | High | Rewrote Task 4 with correct key_indices approach | DD-9, Task 4 | +| F6: DebeziumJson arm doesn't prepend key | High | Updated Task 6 pseudocode to prepend key_row | DD-9, Task 6 | +| F7: extract_debezium_json lifetime unclear | High | Noted pseudocode is illustrative; impl must handle Row allocation | DD-17 | +| F8: `"t"` (truncate) op not addressed | Medium | Silently skip with warning log | DD-13, AC-11 | +| F9: Null Kafka values (tombstones) not discussed | Medium | Treated as deletes; existing pipeline handles this | DD-14, AC-12 | +| F10: TiCDC MODE does nothing in V1 | Medium | Acknowledged as structural-only; establishes syntax for future | DD-15 | +| F11: Multi-partition ordering not discussed | Medium | Documented as known constraint | DD-16 | +| F12: No performance ACs | Low | Added "profile before GA" to DD-10 mitigation | DD-10 | +| F13: Nested match structure confusing | Low | Noted in DD-17; simplify during implementation | DD-17 | diff --git a/src/catalog/src/memory/objects.rs b/src/catalog/src/memory/objects.rs index 26183da326fe6..00269f1c31e83 100644 --- a/src/catalog/src/memory/objects.rs +++ b/src/catalog/src/memory/objects.rs @@ -1033,7 +1033,8 @@ impl DataSourceDesc { SourceEnvelope::None(_) => "none", SourceEnvelope::Upsert(upsert_envelope) => match upsert_envelope.style { mz_storage_types::sources::envelope::UpsertStyle::Default(_) => "upsert", - mz_storage_types::sources::envelope::UpsertStyle::Debezium { .. } => { + mz_storage_types::sources::envelope::UpsertStyle::Debezium { .. } + | mz_storage_types::sources::envelope::UpsertStyle::DebeziumJson { .. } => { // NOTE(aljoscha): Should we somehow mark that this is // using upsert internally? See note above about // DEBEZIUM. diff --git a/src/sql-parser/src/ast/defs/ddl.rs b/src/sql-parser/src/ast/defs/ddl.rs index a43f41bd1e98b..b220000aeb802 100644 --- a/src/sql-parser/src/ast/defs/ddl.rs +++ b/src/sql-parser/src/ast/defs/ddl.rs @@ -560,6 +560,10 @@ pub enum SourceIncludeMetadata { alias: Ident, use_bytes: bool, }, + /// `INCLUDE DEBEZIUM METADATA` — full Debezium envelope as jsonb + DebeziumMetadata { + alias: Option, + }, } impl AstDisplay for SourceIncludeMetadata { @@ -605,6 +609,10 @@ impl AstDisplay for SourceIncludeMetadata { f.write_str(" BYTES"); } } + SourceIncludeMetadata::DebeziumMetadata { alias } => { + f.write_str("DEBEZIUM METADATA"); + print_alias(f, alias); + } } } } @@ -633,10 +641,20 @@ impl AstDisplay for SourceErrorPolicy { } impl_display!(SourceErrorPolicy); +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub enum DebeziumMode { + /// Generic Debezium JSON/Avro format. + None, + /// TiCDC dialect of Debezium format. + TiCdc, +} + #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub enum SourceEnvelope { None, - Debezium, + Debezium { + mode: DebeziumMode, + }, Upsert { value_decode_err_policy: Vec, }, @@ -649,7 +667,7 @@ impl SourceEnvelope { pub fn requires_all_input(&self) -> bool { match self { SourceEnvelope::None => false, - SourceEnvelope::Debezium => false, + SourceEnvelope::Debezium { .. } => false, SourceEnvelope::Upsert { .. } => false, SourceEnvelope::CdcV2 => true, } @@ -663,8 +681,12 @@ impl AstDisplay for SourceEnvelope { // this is unreachable as long as the default is None, but include it in case we ever change that f.write_str("NONE"); } - Self::Debezium => { + Self::Debezium { mode } => { f.write_str("DEBEZIUM"); + match mode { + DebeziumMode::None => {} + DebeziumMode::TiCdc => f.write_str(" (MODE = 'TICDC')"), + } } Self::Upsert { value_decode_err_policy, diff --git a/src/sql-parser/src/parser.rs b/src/sql-parser/src/parser.rs index 1d5444d0b0c1d..0ff8ac0de6f31 100644 --- a/src/sql-parser/src/parser.rs +++ b/src/sql-parser/src/parser.rs @@ -2363,7 +2363,21 @@ impl<'a> Parser<'a> { let envelope = if self.parse_keyword(NONE) { SourceEnvelope::None } else if self.parse_keyword(DEBEZIUM) { - SourceEnvelope::Debezium + let mode = if self.consume_token(&Token::LParen) { + self.expect_keyword(MODE)?; + self.expect_token(&Token::Eq)?; + let mode_str = self.parse_literal_string()?; + self.expect_token(&Token::RParen)?; + match mode_str.to_uppercase().as_str() { + "TICDC" => DebeziumMode::TiCdc, + _ => { + return self.expected(self.peek_pos(), "TICDC", self.peek_token()); + } + } + } else { + DebeziumMode::None + }; + SourceEnvelope::Debezium { mode } } else if self.parse_keyword(UPSERT) { let value_decode_err_policy = if self.consume_token(&Token::LParen) { // We only support the `VALUE DECODING ERRORS` option for now, but if we add another @@ -4816,9 +4830,9 @@ impl<'a> Parser<'a> { fn parse_source_include_metadata(&mut self) -> Result, ParserError> { if self.parse_keyword(INCLUDE) { self.parse_comma_separated(|parser| { - let metadata = match parser - .expect_one_of_keywords(&[KEY, TIMESTAMP, PARTITION, OFFSET, HEADERS, HEADER])? - { + let metadata = match parser.expect_one_of_keywords(&[ + KEY, TIMESTAMP, PARTITION, OFFSET, HEADERS, HEADER, DEBEZIUM, + ])? { KEY => SourceIncludeMetadata::Key { alias: parser.parse_alias()?, }, @@ -4845,6 +4859,12 @@ impl<'a> Parser<'a> { use_bytes, } } + DEBEZIUM => { + parser.expect_keyword(METADATA)?; + SourceIncludeMetadata::DebeziumMetadata { + alias: parser.parse_alias()?, + } + } _ => unreachable!("only explicitly allowed items can be parsed"), }; Ok(metadata) diff --git a/src/sql-parser/tests/testdata/ddl b/src/sql-parser/tests/testdata/ddl index 088b8061b5a86..1b07b30e98b1d 100644 --- a/src/sql-parser/tests/testdata/ddl +++ b/src/sql-parser/tests/testdata/ddl @@ -2511,7 +2511,7 @@ CREATE SOURCE src1 FROM KAFKA CONNECTION conn1 (TOPIC 'baz') FORMAT AVRO USING C ---- CREATE SOURCE src1 FROM KAFKA CONNECTION conn1 (TOPIC = 'baz') FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION conn2 ENVELOPE DEBEZIUM => -CreateSource(CreateSourceStatement { name: UnresolvedItemName([Ident("src1")]), in_cluster: None, col_names: [], connection: Kafka { connection: Name(UnresolvedItemName([Ident("conn1")])), options: [KafkaSourceConfigOption { name: Topic, value: Some(Value(String("baz"))) }] }, include_metadata: [], format: Some(Bare(Avro(Csr { csr_connection: CsrConnectionAvro { connection: CsrConnection { connection: Name(UnresolvedItemName([Ident("conn2")])), options: [] }, key_strategy: None, value_strategy: None, seed: None } }))), envelope: Some(Debezium), if_not_exists: false, key_constraint: None, with_options: [], external_references: None, progress_subsource: None }) +CreateSource(CreateSourceStatement { name: UnresolvedItemName([Ident("src1")]), in_cluster: None, col_names: [], connection: Kafka { connection: Name(UnresolvedItemName([Ident("conn1")])), options: [KafkaSourceConfigOption { name: Topic, value: Some(Value(String("baz"))) }] }, include_metadata: [], format: Some(Bare(Avro(Csr { csr_connection: CsrConnectionAvro { connection: CsrConnection { connection: Name(UnresolvedItemName([Ident("conn2")])), options: [] }, key_strategy: None, value_strategy: None, seed: None } }))), envelope: Some(Debezium { mode: None }), if_not_exists: false, key_constraint: None, with_options: [], external_references: None, progress_subsource: None }) parse-statement @@ -2519,22 +2519,22 @@ CREATE SOURCE src1 FROM KAFKA CONNECTION conn1 (TOPIC 'baz') FORMAT PROTOBUF USI ---- CREATE SOURCE src1 FROM KAFKA CONNECTION conn1 (TOPIC = 'baz') FORMAT PROTOBUF USING CONFLUENT SCHEMA REGISTRY CONNECTION conn2 ENVELOPE DEBEZIUM => -CreateSource(CreateSourceStatement { name: UnresolvedItemName([Ident("src1")]), in_cluster: None, col_names: [], connection: Kafka { connection: Name(UnresolvedItemName([Ident("conn1")])), options: [KafkaSourceConfigOption { name: Topic, value: Some(Value(String("baz"))) }] }, include_metadata: [], format: Some(Bare(Protobuf(Csr { csr_connection: CsrConnectionProtobuf { connection: CsrConnection { connection: Name(UnresolvedItemName([Ident("conn2")])), options: [] }, seed: None } }))), envelope: Some(Debezium), if_not_exists: false, key_constraint: None, with_options: [], external_references: None, progress_subsource: None }) +CreateSource(CreateSourceStatement { name: UnresolvedItemName([Ident("src1")]), in_cluster: None, col_names: [], connection: Kafka { connection: Name(UnresolvedItemName([Ident("conn1")])), options: [KafkaSourceConfigOption { name: Topic, value: Some(Value(String("baz"))) }] }, include_metadata: [], format: Some(Bare(Protobuf(Csr { csr_connection: CsrConnectionProtobuf { connection: CsrConnection { connection: Name(UnresolvedItemName([Ident("conn2")])), options: [] }, seed: None } }))), envelope: Some(Debezium { mode: None }), if_not_exists: false, key_constraint: None, with_options: [], external_references: None, progress_subsource: None }) parse-statement CREATE SOURCE src1 FROM KAFKA CONNECTION conn1 (TOPIC 'baz') ENVELOPE DEBEZIUM (TRANSACTION METADATA (SOURCE a.b.c, COLLECTION 'foo')) ---- -error: Expected end of statement, found left parenthesis +error: Expected MODE, found TRANSACTION CREATE SOURCE src1 FROM KAFKA CONNECTION conn1 (TOPIC 'baz') ENVELOPE DEBEZIUM (TRANSACTION METADATA (SOURCE a.b.c, COLLECTION 'foo')) - ^ + ^ parse-statement CREATE SOURCE src1 FROM KAFKA CONNECTION conn1 (TOPIC 'baz') ENVELOPE DEBEZIUM (TRANSACTION METADATA (COLLECTION 'foo', SOURCE a.b.c)) ---- -error: Expected end of statement, found left parenthesis +error: Expected MODE, found TRANSACTION CREATE SOURCE src1 FROM KAFKA CONNECTION conn1 (TOPIC 'baz') ENVELOPE DEBEZIUM (TRANSACTION METADATA (COLLECTION 'foo', SOURCE a.b.c)) - ^ + ^ # Note that this will error in planning, as you cannot specify START OFFSET and START TIMESTAMP at the same time parse-statement @@ -2542,7 +2542,7 @@ CREATE SOURCE src1 FROM KAFKA CONNECTION conn1 (START OFFSET=1, START TIMESTAMP= ---- CREATE SOURCE src1 FROM KAFKA CONNECTION conn1 (START OFFSET = 1, START TIMESTAMP = 2, TOPIC = 'baz') ENVELOPE DEBEZIUM => -CreateSource(CreateSourceStatement { name: UnresolvedItemName([Ident("src1")]), in_cluster: None, col_names: [], connection: Kafka { connection: Name(UnresolvedItemName([Ident("conn1")])), options: [KafkaSourceConfigOption { name: StartOffset, value: Some(Value(Number("1"))) }, KafkaSourceConfigOption { name: StartTimestamp, value: Some(Value(Number("2"))) }, KafkaSourceConfigOption { name: Topic, value: Some(Value(String("baz"))) }] }, include_metadata: [], format: None, envelope: Some(Debezium), if_not_exists: false, key_constraint: None, with_options: [], external_references: None, progress_subsource: None }) +CreateSource(CreateSourceStatement { name: UnresolvedItemName([Ident("src1")]), in_cluster: None, col_names: [], connection: Kafka { connection: Name(UnresolvedItemName([Ident("conn1")])), options: [KafkaSourceConfigOption { name: StartOffset, value: Some(Value(Number("1"))) }, KafkaSourceConfigOption { name: StartTimestamp, value: Some(Value(Number("2"))) }, KafkaSourceConfigOption { name: Topic, value: Some(Value(String("baz"))) }] }, include_metadata: [], format: None, envelope: Some(Debezium { mode: None }), if_not_exists: false, key_constraint: None, with_options: [], external_references: None, progress_subsource: None }) # Note that this will error in planning, as START OFFSET must be an array of nums parse-statement @@ -2585,7 +2585,7 @@ CREATE SOURCE src1 FROM KAFKA CONNECTION conn1 (TOPIC 'baz') FORMAT PROTOBUF USI ---- CREATE SOURCE src1 FROM KAFKA CONNECTION conn1 (TOPIC = 'baz') FORMAT PROTOBUF USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn SEED VALUE SCHEMA '{"some": "seed"}' MESSAGE 'Batch' ENVELOPE DEBEZIUM => -CreateSource(CreateSourceStatement { name: UnresolvedItemName([Ident("src1")]), in_cluster: None, col_names: [], connection: Kafka { connection: Name(UnresolvedItemName([Ident("conn1")])), options: [KafkaSourceConfigOption { name: Topic, value: Some(Value(String("baz"))) }] }, include_metadata: [], format: Some(Bare(Protobuf(Csr { csr_connection: CsrConnectionProtobuf { connection: CsrConnection { connection: Name(UnresolvedItemName([Ident("csr_conn")])), options: [] }, seed: Some(CsrSeedProtobuf { key: None, value: CsrSeedProtobufSchema { schema: "{\"some\": \"seed\"}", message_name: "Batch" } }) } }))), envelope: Some(Debezium), if_not_exists: false, key_constraint: None, with_options: [], external_references: None, progress_subsource: None }) +CreateSource(CreateSourceStatement { name: UnresolvedItemName([Ident("src1")]), in_cluster: None, col_names: [], connection: Kafka { connection: Name(UnresolvedItemName([Ident("conn1")])), options: [KafkaSourceConfigOption { name: Topic, value: Some(Value(String("baz"))) }] }, include_metadata: [], format: Some(Bare(Protobuf(Csr { csr_connection: CsrConnectionProtobuf { connection: CsrConnection { connection: Name(UnresolvedItemName([Ident("csr_conn")])), options: [] }, seed: Some(CsrSeedProtobuf { key: None, value: CsrSeedProtobufSchema { schema: "{\"some\": \"seed\"}", message_name: "Batch" } }) } }))), envelope: Some(Debezium { mode: None }), if_not_exists: false, key_constraint: None, with_options: [], external_references: None, progress_subsource: None }) parse-statement @@ -2593,7 +2593,7 @@ CREATE SOURCE src1 FROM KAFKA CONNECTION conn1 (TOPIC 'baz') FORMAT PROTOBUF USI ---- CREATE SOURCE src1 FROM KAFKA CONNECTION conn1 (TOPIC = 'baz') FORMAT PROTOBUF USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn SEED KEY SCHEMA '{"some": "seed"}' MESSAGE 'Batch' VALUE SCHEMA '123' MESSAGE 'M' ENVELOPE DEBEZIUM => -CreateSource(CreateSourceStatement { name: UnresolvedItemName([Ident("src1")]), in_cluster: None, col_names: [], connection: Kafka { connection: Name(UnresolvedItemName([Ident("conn1")])), options: [KafkaSourceConfigOption { name: Topic, value: Some(Value(String("baz"))) }] }, include_metadata: [], format: Some(Bare(Protobuf(Csr { csr_connection: CsrConnectionProtobuf { connection: CsrConnection { connection: Name(UnresolvedItemName([Ident("csr_conn")])), options: [] }, seed: Some(CsrSeedProtobuf { key: Some(CsrSeedProtobufSchema { schema: "{\"some\": \"seed\"}", message_name: "Batch" }), value: CsrSeedProtobufSchema { schema: "123", message_name: "M" } }) } }))), envelope: Some(Debezium), if_not_exists: false, key_constraint: None, with_options: [], external_references: None, progress_subsource: None }) +CreateSource(CreateSourceStatement { name: UnresolvedItemName([Ident("src1")]), in_cluster: None, col_names: [], connection: Kafka { connection: Name(UnresolvedItemName([Ident("conn1")])), options: [KafkaSourceConfigOption { name: Topic, value: Some(Value(String("baz"))) }] }, include_metadata: [], format: Some(Bare(Protobuf(Csr { csr_connection: CsrConnectionProtobuf { connection: CsrConnection { connection: Name(UnresolvedItemName([Ident("csr_conn")])), options: [] }, seed: Some(CsrSeedProtobuf { key: Some(CsrSeedProtobufSchema { schema: "{\"some\": \"seed\"}", message_name: "Batch" }), value: CsrSeedProtobufSchema { schema: "123", message_name: "M" } }) } }))), envelope: Some(Debezium { mode: None }), if_not_exists: false, key_constraint: None, with_options: [], external_references: None, progress_subsource: None }) parse-statement CREATE SOURCE src1 FROM KAFKA CONNECTION conn1 (TOPIC 'baz') FORMAT PROTOBUF USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn SEED KEY SCHEMA '{"some": "seed"}' MESSAGE 'Batch' VALUE SCHEMA '123' MESSAGE 'M' ENVELOPE UPSERT diff --git a/src/sql/src/plan/statement/ddl.rs b/src/sql/src/plan/statement/ddl.rs index 0343f1c676584..0c6525cfc3a46 100644 --- a/src/sql/src/plan/statement/ddl.rs +++ b/src/sql/src/plan/statement/ddl.rs @@ -95,7 +95,8 @@ use mz_storage_types::sources::encoding::{ SourceDataEncoding, included_column_desc, }; use mz_storage_types::sources::envelope::{ - KeyEnvelope, NoneEnvelope, SourceEnvelope, UnplannedSourceEnvelope, UpsertStyle, + DebeziumJsonMode, KeyEnvelope, NoneEnvelope, SourceEnvelope, UnplannedSourceEnvelope, + UpsertStyle, }; use mz_storage_types::sources::kafka::{ KafkaMetadataKind, KafkaSourceConnection, KafkaSourceExportDetails, kafka_metadata_columns_desc, @@ -812,7 +813,7 @@ pub fn plan_create_source( envelope, ast::SourceEnvelope::Upsert { .. } | ast::SourceEnvelope::None - | ast::SourceEnvelope::Debezium + | ast::SourceEnvelope::Debezium { .. } ) { sql_bail!("INCLUDE requires ENVELOPE (NONE|UPSERT|DEBEZIUM)"); @@ -1274,6 +1275,10 @@ fn plan_kafka_source_connection( // handled below None } + SourceIncludeMetadata::DebeziumMetadata { .. } => { + // handled in apply_source_envelope_encoding + None + } }) .collect(); Ok(KafkaSourceConnection { @@ -1375,9 +1380,17 @@ fn apply_source_envelope_encoding( key_envelope_no_encoding, )?; - match (&envelope, &key_envelope) { - (ast::SourceEnvelope::Debezium, KeyEnvelope::None) => {} - (ast::SourceEnvelope::Debezium, _) => sql_bail!( + match ( + &envelope, + &key_envelope, + encoding.as_ref().map(|e| &e.value), + ) { + // Avro Debezium: keys are in the unpacked `after` record, reject INCLUDE KEY + (ast::SourceEnvelope::Debezium { .. }, KeyEnvelope::None, _) => {} + (ast::SourceEnvelope::Debezium { .. }, _, Some(DataEncoding::Json)) => { + // JSON Debezium: key is included as a separate column, INCLUDE KEY is fine + } + (ast::SourceEnvelope::Debezium { .. }, _, _) => sql_bail!( "Cannot use INCLUDE KEY with ENVELOPE DEBEZIUM: Debezium values include all keys." ), _ => {} @@ -1394,20 +1407,49 @@ fn apply_source_envelope_encoding( let envelope = match &envelope { // TODO: fixup key envelope ast::SourceEnvelope::None => UnplannedSourceEnvelope::None(key_envelope), - ast::SourceEnvelope::Debezium => { - //TODO check that key envelope is not set - let after_idx = match typecheck_debezium(&value_desc) { - Ok((_before_idx, after_idx)) => Ok(after_idx), - Err(type_err) => match encoding.as_ref().map(|e| &e.value) { - Some(DataEncoding::Avro(_)) => Err(type_err), - _ => Err(sql_err!( - "ENVELOPE DEBEZIUM requires that VALUE FORMAT is set to AVRO" - )), - }, - }?; + ast::SourceEnvelope::Debezium { mode } => { + match encoding.as_ref().map(|e| &e.value) { + Some(DataEncoding::Json) => { + // JSON path: skip typecheck_debezium, use DebeziumJson style + let storage_mode = match mode { + ast::DebeziumMode::TiCdc => DebeziumJsonMode::TiCdc, + ast::DebeziumMode::None => DebeziumJsonMode::Generic, + }; + // Extract INCLUDE DEBEZIUM METADATA column name if specified + let envelope_column = include_metadata + .iter() + .find_map(|m| match m { + SourceIncludeMetadata::DebeziumMetadata { alias } => { + Some(alias.as_ref().map_or_else( + || "debezium_metadata".to_string(), + |a| a.to_string(), + )) + } + _ => None, + }); + UnplannedSourceEnvelope::Upsert { + style: UpsertStyle::DebeziumJson { + mode: storage_mode, + envelope_column, + }, + } + } + _ => { + // Existing Avro path + let after_idx = match typecheck_debezium(&value_desc) { + Ok((_before_idx, after_idx)) => Ok(after_idx), + Err(type_err) => match encoding.as_ref().map(|e| &e.value) { + Some(DataEncoding::Avro(_)) => Err(type_err), + _ => Err(sql_err!( + "ENVELOPE DEBEZIUM requires that VALUE FORMAT is set to AVRO or JSON" + )), + }, + }?; - UnplannedSourceEnvelope::Upsert { - style: UpsertStyle::Debezium { after_idx }, + UnplannedSourceEnvelope::Upsert { + style: UpsertStyle::Debezium { after_idx }, + } + } } } ast::SourceEnvelope::Upsert { @@ -1460,6 +1502,21 @@ fn apply_source_envelope_encoding( } }; + // Validate INCLUDE DEBEZIUM METADATA is only used with ENVELOPE DEBEZIUM + JSON + let has_dbz_metadata = include_metadata + .iter() + .any(|m| matches!(m, SourceIncludeMetadata::DebeziumMetadata { .. })); + if has_dbz_metadata { + match &envelope { + UnplannedSourceEnvelope::Upsert { + style: UpsertStyle::DebeziumJson { .. }, + } => {} // valid + _ => sql_bail!( + "INCLUDE DEBEZIUM METADATA requires ENVELOPE DEBEZIUM with VALUE FORMAT JSON" + ), + } + } + let metadata_desc = included_column_desc(metadata_columns_desc); let (envelope, desc) = envelope.desc(key_desc, value_desc, metadata_desc)?; @@ -1847,7 +1904,7 @@ pub fn plan_create_table_from_source( envelope, ast::SourceEnvelope::Upsert { .. } | ast::SourceEnvelope::None - | ast::SourceEnvelope::Debezium + | ast::SourceEnvelope::Debezium { .. } ) { // TODO(guswynn): should this be `bail_unsupported!`? @@ -1900,6 +1957,10 @@ pub fn plan_create_table_from_source( // handled below None } + SourceIncludeMetadata::DebeziumMetadata { .. } => { + // handled in apply_source_envelope_encoding + None + } }) .collect(); @@ -2247,7 +2308,7 @@ fn get_encoding( let requires_keyvalue = matches!( envelope, - ast::SourceEnvelope::Debezium | ast::SourceEnvelope::Upsert { .. } + ast::SourceEnvelope::Debezium { .. } | ast::SourceEnvelope::Upsert { .. } ); let is_keyvalue = encoding.key.is_some(); if requires_keyvalue && !is_keyvalue { diff --git a/src/sql/src/pure.rs b/src/sql/src/pure.rs index 44488fa966a89..078ab2e9d7139 100644 --- a/src/sql/src/pure.rs +++ b/src/sql/src/pure.rs @@ -2413,7 +2413,7 @@ async fn purify_csr_connection_proto( .await .ok(); - if matches!(envelope, Some(SourceEnvelope::Debezium)) && key.is_none() { + if matches!(envelope, Some(SourceEnvelope::Debezium { .. })) && key.is_none() { sql_bail!("Key schema is required for ENVELOPE DEBEZIUM"); } @@ -2465,7 +2465,7 @@ async fn purify_csr_connection_avro( topic, ) .await?; - if matches!(envelope, Some(SourceEnvelope::Debezium)) && key_schema.is_none() { + if matches!(envelope, Some(SourceEnvelope::Debezium { .. })) && key_schema.is_none() { sql_bail!("Key schema is required for ENVELOPE DEBEZIUM"); } diff --git a/src/storage-types/src/sources/envelope.rs b/src/storage-types/src/sources/envelope.rs index 3eabec1ce7d10..99c3b2e95030e 100644 --- a/src/storage-types/src/sources/envelope.rs +++ b/src/storage-types/src/sources/envelope.rs @@ -67,13 +67,30 @@ pub struct UpsertEnvelope { pub key_indices: Vec, } +/// Mode for JSON Debezium envelope processing. +#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] +pub enum DebeziumJsonMode { + /// Generic Debezium JSON format. + Generic, + /// TiCDC dialect of Debezium format. + TiCdc, +} + #[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] pub enum UpsertStyle { /// `ENVELOPE UPSERT`, where the key shape depends on the independent /// `KeyEnvelope` Default(KeyEnvelope), - /// `ENVELOPE DEBEZIUM UPSERT` + /// `ENVELOPE DEBEZIUM UPSERT` with Avro encoding Debezium { after_idx: usize }, + /// `ENVELOPE DEBEZIUM` with JSON encoding. Envelope extraction happens at + /// render time by parsing the Jsonb datum to extract `op` and `after` fields. + /// If `envelope_column` is set (via `INCLUDE DEBEZIUM METADATA`), the full + /// Debezium envelope is exposed as an additional jsonb column. + DebeziumJson { + mode: DebeziumJsonMode, + envelope_column: Option, + }, /// `ENVELOPE UPSERT` where any decode errors will get serialized into a /// SqlScalarType::Record column named `error_column`, and all value columns are /// nullable. The key shape depends on the independent `KeyEnvelope`. @@ -227,6 +244,36 @@ impl UnplannedSourceEnvelope { desc, ) } + UnplannedSourceEnvelope::Upsert { + style: UpsertStyle::DebeziumJson { envelope_column, .. }, + .. + } => { + // JSON Debezium: output is [key: Jsonb, data: Jsonb, ...metadata] + // Key is included as a separate column (unlike Avro Debezium). + // `data` contains the `after` field (the row payload). + // If INCLUDE DEBEZIUM METADATA is specified, an additional jsonb column + // contains the full Debezium envelope for CDC metadata access. + let key_desc = RelationDesc::builder() + .with_column("key", SqlScalarType::Jsonb.nullable(false)) + .finish(); + let mut value_builder = RelationDesc::builder() + .with_column("data", SqlScalarType::Jsonb.nullable(false)); + if let Some(col_name) = envelope_column { + value_builder = + value_builder.with_column(col_name.as_str(), SqlScalarType::Jsonb.nullable(false)); + } + let value_desc = value_builder.finish(); + // key_indices = [0]: points to the key column for UpsertKey::from_value rehydration + let key_indices = vec![0usize]; + let desc = key_desc + .with_key(key_indices.clone()) + .concat(value_desc) + .concat(metadata_desc); + ( + self.into_source_envelope(Some(key_indices), None, Some(desc.arity())), + desc, + ) + } UnplannedSourceEnvelope::Upsert { style: UpsertStyle::Debezium { after_idx }, .. diff --git a/src/storage/src/render/sources.rs b/src/storage/src/render/sources.rs index f2be72edab714..8685cdc94a900 100644 --- a/src/storage/src/render/sources.rs +++ b/src/storage/src/render/sources.rs @@ -24,10 +24,13 @@ use mz_storage_operators::persist_source::Subtime; use mz_storage_types::controller::CollectionMetadata; use mz_storage_types::dyncfgs; use mz_storage_types::errors::{ - DataflowError, DecodeError, EnvelopeError, UpsertError, UpsertNullKeyError, UpsertValueError, + DataflowError, DecodeError, DecodeErrorKind, EnvelopeError, UpsertError, UpsertNullKeyError, + UpsertValueError, }; use mz_storage_types::parameters::StorageMaxInflightBytesConfig; -use mz_storage_types::sources::envelope::{KeyEnvelope, NoneEnvelope, UpsertEnvelope, UpsertStyle}; +use mz_storage_types::sources::envelope::{ + DebeziumJsonMode, KeyEnvelope, NoneEnvelope, UpsertEnvelope, UpsertStyle, +}; use mz_storage_types::sources::*; use mz_timely_util::builder_async::PressOnDropButton; use mz_timely_util::operator::CollectionExt; @@ -515,8 +518,9 @@ fn upsert_commands( // We can now apply the key envelope let key_row = match upsert_envelope.style { - // flattened or debezium + // flattened, debezium (avro or json) UpsertStyle::Debezium { .. } + | UpsertStyle::DebeziumJson { .. } | UpsertStyle::Default(KeyEnvelope::Flattened) | UpsertStyle::ValueErrInline { key_envelope: KeyEnvelope::Flattened, @@ -548,6 +552,29 @@ fn upsert_commands( let value = match result.value { Some(Ok(ref row)) => match upsert_envelope.style { + UpsertStyle::DebeziumJson { + ref mode, + ref envelope_column, + } => { + match extract_debezium_json(row, mode, envelope_column.is_some()) { + Ok(Some((after_row, envelope_row))) => { + let mut packer = row_buf.packer(); + // key, data (after), [envelope if requested], then metadata + packer.extend_by_row(&key_row); + packer.extend_by_row(&after_row); + if let Some(env_row) = envelope_row { + packer.extend_by_row(&env_row); + } + packer.extend_by_row(&metadata); + Some(Ok(row_buf.clone())) + } + Ok(None) => None, // Delete or truncate + Err(err) => Some(Err(Box::new(UpsertError::Value(UpsertValueError { + for_key: key_row.clone(), + inner: err, + })))), + } + } UpsertStyle::Debezium { after_idx } => match row.iter().nth(after_idx).unwrap() { Datum::List(after) => { let mut packer = row_buf.packer(); @@ -611,6 +638,120 @@ fn upsert_commands( }) } +/// Extract the `after` payload from a Debezium JSON envelope message. +/// +/// Handles both `payload`-wrapped (`{ "payload": { "op": ..., "after": ... } }`) +/// and flat (`{ "op": ..., "after": ... }`) formats. +/// +/// Extracts the `after` payload (and optionally the full envelope) from a Debezium JSON message. +/// +/// Returns: +/// - `Ok(Some((after_row, Some(envelope_row))))` when `include_envelope` is true +/// - `Ok(Some((after_row, None)))` when `include_envelope` is false +/// - `Ok(None)` for delete/truncate operations (op = d/t) +/// - `Err(DecodeError)` for malformed messages +fn extract_debezium_json( + row: &Row, + _mode: &DebeziumJsonMode, + include_envelope: bool, +) -> Result)>, DecodeError> { + use mz_repr::adt::jsonb::JsonbRef; + + let datum = row.iter().next().unwrap(); + if datum == Datum::JsonNull { + // Null value treated as delete (tombstone) + return Ok(None); + } + + // Wrap the datum as a JsonbRef and convert to serde_json::Value for field access + let jsonb = JsonbRef::from_datum(datum); + let json_value = jsonb.to_serde_json(); + + // Handle payload-wrapped vs flat format + let envelope = match json_value.get("payload") { + Some(payload) if payload.is_object() => payload, + _ => &json_value, + }; + + // Extract "op" field + let op = match envelope.get("op").and_then(|v| v.as_str()) { + Some(op) => op, + None => { + return Err(DecodeError { + kind: DecodeErrorKind::Text("Debezium JSON message missing 'op' field".into()), + raw: vec![], + }); + } + }; + + match op { + "d" => Ok(None), + "t" => { + // Truncate: silently skip per DD-13 + tracing::warn!("Debezium JSON: ignoring truncate event (op='t')"); + Ok(None) + } + "c" | "u" | "r" => { + // Create, update, or read (snapshot) — extract "after" field + let after = match envelope.get("after") { + Some(serde_json::Value::Null) | None => { + return Err(DecodeError { + kind: DecodeErrorKind::Text( + format!( + "Debezium JSON message with op='{}' has null or missing 'after' field", + op + ) + .into(), + ), + raw: vec![], + }); + } + Some(after) => after, + }; + + // Re-serialize "after" into a Row with a single Jsonb datum + let after_json = + mz_repr::adt::jsonb::Jsonb::from_serde_json(after.clone()).map_err(|e| { + DecodeError { + kind: DecodeErrorKind::Text( + format!("Failed to convert Debezium 'after' to Jsonb: {}", e).into(), + ), + raw: vec![], + } + })?; + + // Optionally preserve the full envelope as Jsonb for CDC metadata access + let envelope_row = if include_envelope { + let envelope_json = + mz_repr::adt::jsonb::Jsonb::from_serde_json(envelope.clone()).map_err(|e| { + DecodeError { + kind: DecodeErrorKind::Text( + format!("Failed to convert Debezium envelope to Jsonb: {}", e) + .into(), + ), + raw: vec![], + } + })?; + Some(envelope_json.into_row()) + } else { + None + }; + + Ok(Some((after_json.into_row(), envelope_row))) + } + other => Err(DecodeError { + kind: DecodeErrorKind::Text( + format!( + "Debezium JSON message has unrecognized 'op' value: '{}'", + other + ) + .into(), + ), + raw: vec![], + }), + } +} + /// Convert from streams of [`DecodeResult`] to Rows, inserting the Key according to [`KeyEnvelope`] fn flatten_results_prepend_keys( none_envelope: &NoneEnvelope, diff --git a/test/testdrive/kafka-debezium-json-sources.td b/test/testdrive/kafka-debezium-json-sources.td new file mode 100644 index 0000000000000..a8040d1f1e30e --- /dev/null +++ b/test/testdrive/kafka-debezium-json-sources.td @@ -0,0 +1,201 @@ +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. + +$ set-sql-timeout duration=60s + +$ set-arg-default default-storage-size=scale=1,workers=1 + +# ============================================================================= +# Test Debezium JSON envelope support (generic mode) +# ============================================================================= + +> CREATE CONNECTION kafka_conn + TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT); + +# --- Basic CRUD operations --- + +$ kafka-create-topic topic=dbz-json partitions=1 + +# Use simple integer keys to avoid key-terminator conflicts with JSON colons +$ kafka-ingest format=bytes key-format=bytes key-terminator=: topic=dbz-json +1:{"before":null,"after":{"id":1,"creature":"mudskipper"},"op":"c"} +1:{"before":{"id":1,"creature":"mudskipper"},"after":{"id":1,"creature":"salamander"},"op":"u"} +1:{"before":{"id":1,"creature":"salamander"},"after":{"id":1,"creature":"lizard"},"op":"u"} + +> CREATE CLUSTER dbz_json_cluster SIZE '${arg.default-storage-size}'; +> BEGIN +> CREATE SOURCE dbz_json_src + IN CLUSTER dbz_json_cluster + FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-dbz-json-${testdrive.seed}'); + +> CREATE TABLE dbz_json_tbl FROM SOURCE dbz_json_src (REFERENCE "testdrive-dbz-json-${testdrive.seed}") + KEY FORMAT JSON VALUE FORMAT JSON + ENVELOPE DEBEZIUM +> COMMIT + +# Output schema is [key: jsonb, data: jsonb] +# The last update for key=1 should win (lizard) +# Use ->> to extract fields since JSON key order is non-deterministic +> SELECT key, data->>'id', data->>'creature' FROM dbz_json_tbl +1 1 lizard + +# --- Insert a second row --- + +$ kafka-ingest format=bytes key-format=bytes key-terminator=: topic=dbz-json +2:{"before":null,"after":{"id":2,"creature":"archeopteryx"},"op":"c"} +2:{"before":{"id":2,"creature":"archeopteryx"},"after":{"id":2,"creature":"velociraptor"},"op":"u"} + +> SELECT key, data->>'creature' FROM dbz_json_tbl ORDER BY key +1 lizard +2 velociraptor + +# --- Test delete (op=d) --- + +$ kafka-ingest format=bytes key-format=bytes key-terminator=: topic=dbz-json +1:{"before":{"id":1,"creature":"lizard"},"after":null,"op":"d"} + +> SELECT key, data->>'creature' FROM dbz_json_tbl +2 velociraptor + +# --- Test reinsertion after delete --- + +$ kafka-ingest format=bytes key-format=bytes key-terminator=: topic=dbz-json +1:{"before":null,"after":{"id":1,"creature":"chicken"},"op":"c"} + +> SELECT key, data->>'creature' FROM dbz_json_tbl ORDER BY key +1 chicken +2 velociraptor + +# --- Test snapshot read (op=r) --- + +$ kafka-create-topic topic=dbz-json-snapshot partitions=1 + +$ kafka-ingest format=bytes key-format=bytes key-terminator=: topic=dbz-json-snapshot +10:{"before":null,"after":{"id":10,"name":"snapshot_row"},"op":"r"} + +> BEGIN +> CREATE SOURCE dbz_json_snap_src + IN CLUSTER dbz_json_cluster + FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-dbz-json-snapshot-${testdrive.seed}'); + +> CREATE TABLE dbz_json_snap_tbl FROM SOURCE dbz_json_snap_src (REFERENCE "testdrive-dbz-json-snapshot-${testdrive.seed}") + KEY FORMAT JSON VALUE FORMAT JSON + ENVELOPE DEBEZIUM +> COMMIT + +> SELECT key, data->>'name' FROM dbz_json_snap_tbl +10 snapshot_row + +# --- Test payload-wrapped format (Debezium default with schema+payload) --- + +$ kafka-create-topic topic=dbz-json-wrapped partitions=1 + +$ kafka-ingest format=bytes key-format=bytes key-terminator=: topic=dbz-json-wrapped +100:{"schema":{"type":"struct","fields":[]},"payload":{"before":null,"after":{"id":100,"val":"wrapped"},"op":"c"}} + +> BEGIN +> CREATE SOURCE dbz_json_wrapped_src + IN CLUSTER dbz_json_cluster + FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-dbz-json-wrapped-${testdrive.seed}'); + +> CREATE TABLE dbz_json_wrapped_tbl FROM SOURCE dbz_json_wrapped_src (REFERENCE "testdrive-dbz-json-wrapped-${testdrive.seed}") + KEY FORMAT JSON VALUE FORMAT JSON + ENVELOPE DEBEZIUM +> COMMIT + +> SELECT key, data->>'val' FROM dbz_json_wrapped_tbl +100 wrapped + +# --- Test INCLUDE OFFSET metadata --- + +$ kafka-create-topic topic=dbz-json-meta partitions=1 + +$ kafka-ingest format=bytes key-format=bytes key-terminator=: topic=dbz-json-meta +1:{"before":null,"after":{"id":1,"val":"first"},"op":"c"} +2:{"before":null,"after":{"id":2,"val":"second"},"op":"c"} + +> BEGIN +> CREATE SOURCE dbz_json_meta_src + IN CLUSTER dbz_json_cluster + FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-dbz-json-meta-${testdrive.seed}'); + +> CREATE TABLE dbz_json_meta_tbl FROM SOURCE dbz_json_meta_src (REFERENCE "testdrive-dbz-json-meta-${testdrive.seed}") + KEY FORMAT JSON VALUE FORMAT JSON + INCLUDE OFFSET + ENVELOPE DEBEZIUM +> COMMIT + +> SELECT key, data->>'val', "offset" FROM dbz_json_meta_tbl ORDER BY key +1 first 0 +2 second 1 + +# --- Error case: missing KEY FORMAT --- + +$ kafka-create-topic topic=dbz-json-nokey partitions=1 + +> CREATE SOURCE dbz_json_nokey_src + IN CLUSTER dbz_json_cluster + FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-dbz-json-nokey-${testdrive.seed}'); + +! CREATE TABLE dbz_json_nokey_tbl FROM SOURCE dbz_json_nokey_src (REFERENCE "testdrive-dbz-json-nokey-${testdrive.seed}") + FORMAT JSON + ENVELOPE DEBEZIUM +contains:ENVELOPE [DEBEZIUM] UPSERT requires that KEY FORMAT be specified + +# --- Error case: non-JSON/non-Avro format --- + +! CREATE TABLE dbz_json_nokey_tbl FROM SOURCE dbz_json_nokey_src (REFERENCE "testdrive-dbz-json-nokey-${testdrive.seed}") + KEY FORMAT TEXT VALUE FORMAT TEXT + ENVELOPE DEBEZIUM +contains:ENVELOPE DEBEZIUM requires that VALUE FORMAT is set to AVRO or JSON + +# --- Test INCLUDE DEBEZIUM METADATA --- + +$ kafka-create-topic topic=dbz-json-dbzmeta partitions=1 + +$ kafka-ingest format=bytes key-format=bytes key-terminator=: topic=dbz-json-dbzmeta +1:{"before":null,"after":{"id":1,"val":"hello"},"op":"c","ts_ms":1000,"source":{"connector":"test"}} + +> BEGIN +> CREATE SOURCE dbz_json_dbzmeta_src + IN CLUSTER dbz_json_cluster + FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-dbz-json-dbzmeta-${testdrive.seed}'); + +> CREATE TABLE dbz_json_dbzmeta_tbl FROM SOURCE dbz_json_dbzmeta_src (REFERENCE "testdrive-dbz-json-dbzmeta-${testdrive.seed}") + KEY FORMAT JSON VALUE FORMAT JSON + INCLUDE DEBEZIUM METADATA AS dbz_envelope + ENVELOPE DEBEZIUM +> COMMIT + +> SELECT key, data->>'val', dbz_envelope->>'op', dbz_envelope->>'ts_ms' FROM dbz_json_dbzmeta_tbl +1 hello c 1000 + +# --- Test INCLUDE DEBEZIUM METADATA with default column name --- + +> BEGIN +> CREATE SOURCE dbz_json_dbzmeta_default_src + IN CLUSTER dbz_json_cluster + FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-dbz-json-dbzmeta-${testdrive.seed}'); + +> CREATE TABLE dbz_json_dbzmeta_default_tbl FROM SOURCE dbz_json_dbzmeta_default_src (REFERENCE "testdrive-dbz-json-dbzmeta-${testdrive.seed}") + KEY FORMAT JSON VALUE FORMAT JSON + INCLUDE DEBEZIUM METADATA + ENVELOPE DEBEZIUM +> COMMIT + +> SELECT key, debezium_metadata->>'op' FROM dbz_json_dbzmeta_default_tbl +1 c + +# --- Error case: INCLUDE DEBEZIUM METADATA without ENVELOPE DEBEZIUM --- + +! CREATE TABLE dbz_json_nokey_tbl FROM SOURCE dbz_json_nokey_src (REFERENCE "testdrive-dbz-json-nokey-${testdrive.seed}") + KEY FORMAT JSON VALUE FORMAT JSON + INCLUDE DEBEZIUM METADATA + ENVELOPE UPSERT +contains:INCLUDE DEBEZIUM METADATA requires ENVELOPE DEBEZIUM with VALUE FORMAT JSON diff --git a/test/testdrive/kafka-debezium-json-ticdc-sources.td b/test/testdrive/kafka-debezium-json-ticdc-sources.td new file mode 100644 index 0000000000000..b870bdab963f9 --- /dev/null +++ b/test/testdrive/kafka-debezium-json-ticdc-sources.td @@ -0,0 +1,105 @@ +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. + +$ set-sql-timeout duration=60s + +$ set-arg-default default-storage-size=scale=1,workers=1 + +# ============================================================================= +# Test Debezium JSON envelope support with TiCDC mode +# TiCDC uses the same Debezium format but may include extra fields like +# commit_ts, cluster_id, etc. +# ============================================================================= + +> CREATE CONNECTION kafka_conn + TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT); + +# --- Basic TiCDC CRUD operations --- + +$ kafka-create-topic topic=dbz-ticdc partitions=1 + +# TiCDC messages include additional TiDB-specific fields +# Use simple integer keys to avoid key-terminator conflicts with JSON colons +$ kafka-ingest format=bytes key-format=bytes key-terminator=: topic=dbz-ticdc +1:{"before":null,"after":{"id":1,"name":"alice"},"op":"c","source":{"version":"1.0","connector":"tidb","name":"test","commit_ts":1000},"ts_ms":1609459200000} +2:{"before":null,"after":{"id":2,"name":"bob"},"op":"c","source":{"version":"1.0","connector":"tidb","name":"test","commit_ts":1001},"ts_ms":1609459200001} +1:{"before":{"id":1,"name":"alice"},"after":{"id":1,"name":"alice_updated"},"op":"u","source":{"version":"1.0","connector":"tidb","name":"test","commit_ts":1002},"ts_ms":1609459200002} + +> CREATE CLUSTER dbz_ticdc_cluster SIZE '${arg.default-storage-size}'; +> BEGIN +> CREATE SOURCE dbz_ticdc_src + IN CLUSTER dbz_ticdc_cluster + FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-dbz-ticdc-${testdrive.seed}'); + +> CREATE TABLE dbz_ticdc_tbl FROM SOURCE dbz_ticdc_src (REFERENCE "testdrive-dbz-ticdc-${testdrive.seed}") + KEY FORMAT JSON VALUE FORMAT JSON + INCLUDE DEBEZIUM METADATA AS dbz_meta + ENVELOPE DEBEZIUM (MODE = 'TICDC') +> COMMIT + +> SELECT key, data->>'name' FROM dbz_ticdc_tbl ORDER BY key +1 alice_updated +2 bob + +# --- Verify INCLUDE DEBEZIUM METADATA preserves CDC metadata --- +# The dbz_meta column contains the full Debezium envelope including source metadata. +# This is critical for TiCDC users who need commit_ts to correlate with Dumpling snapshots. +> SELECT key, dbz_meta->'source'->>'commit_ts' FROM dbz_ticdc_tbl ORDER BY key +1 1002 +2 1001 + +# ts_ms is also available +> SELECT key, dbz_meta->>'ts_ms' FROM dbz_ticdc_tbl ORDER BY key +1 1609459200002 +2 1609459200001 + +# --- Test delete --- + +$ kafka-ingest format=bytes key-format=bytes key-terminator=: topic=dbz-ticdc +2:{"before":{"id":2,"name":"bob"},"after":null,"op":"d","source":{"version":"1.0","connector":"tidb","name":"test","commit_ts":1003},"ts_ms":1609459200003} + +> SELECT key, data->>'name' FROM dbz_ticdc_tbl +1 alice_updated + +# --- Test reinsertion --- + +$ kafka-ingest format=bytes key-format=bytes key-terminator=: topic=dbz-ticdc +2:{"before":null,"after":{"id":2,"name":"bob_v2"},"op":"c","source":{"version":"1.0","connector":"tidb","name":"test","commit_ts":1004},"ts_ms":1609459200004} + +> SELECT key, data->>'name' FROM dbz_ticdc_tbl ORDER BY key +1 alice_updated +2 bob_v2 + +# --- Verify MODE syntax parsing --- + +# MODE parameter is case-insensitive +> BEGIN +> CREATE SOURCE dbz_ticdc_upper_src + IN CLUSTER dbz_ticdc_cluster + FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-dbz-ticdc-${testdrive.seed}'); + +> CREATE TABLE dbz_ticdc_upper_tbl FROM SOURCE dbz_ticdc_upper_src (REFERENCE "testdrive-dbz-ticdc-${testdrive.seed}") + KEY FORMAT JSON VALUE FORMAT JSON + ENVELOPE DEBEZIUM (MODE = 'ticdc') +> COMMIT + +> SELECT key, data->>'name' FROM dbz_ticdc_upper_tbl ORDER BY key +1 alice_updated +2 bob_v2 + +# --- Error case: invalid MODE value --- + +> CREATE SOURCE dbz_ticdc_bad_src + IN CLUSTER dbz_ticdc_cluster + FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-dbz-ticdc-${testdrive.seed}'); + +! CREATE TABLE dbz_ticdc_bad_tbl FROM SOURCE dbz_ticdc_bad_src (REFERENCE "testdrive-dbz-ticdc-${testdrive.seed}") + KEY FORMAT JSON VALUE FORMAT JSON + ENVELOPE DEBEZIUM (MODE = 'INVALID') +contains:Expected TICDC diff --git a/test/testdrive/kafka-upsert-debezium-sources.td b/test/testdrive/kafka-upsert-debezium-sources.td index 578738840abfd..44780202c4efc 100644 --- a/test/testdrive/kafka-upsert-debezium-sources.td +++ b/test/testdrive/kafka-upsert-debezium-sources.td @@ -108,9 +108,9 @@ $ kafka-ingest format=avro topic=dbzupsert key-format=avro key-schema=${keyschem contains:ENVELOPE [DEBEZIUM] UPSERT requires that KEY FORMAT be specified ! CREATE TABLE doin_upsert_tbl FROM SOURCE doin_upsert (REFERENCE "testdrive-dbzupsert-${testdrive.seed}") - KEY FORMAT JSON VALUE FORMAT JSON + KEY FORMAT TEXT VALUE FORMAT TEXT ENVELOPE DEBEZIUM -contains:ENVELOPE DEBEZIUM requires that VALUE FORMAT is set to AVRO +contains:ENVELOPE DEBEZIUM requires that VALUE FORMAT is set to AVRO or JSON > CREATE TABLE doin_upsert_tbl FROM SOURCE doin_upsert (REFERENCE "testdrive-dbzupsert-${testdrive.seed}") FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn