From 5d0a59d1df1c2ebdf377ad77e2ae7dec4155b708 Mon Sep 17 00:00:00 2001 From: Jon Currey Date: Thu, 5 Mar 2026 19:59:41 -0500 Subject: [PATCH 1/5] storage: Add Debezium JSON envelope support for Kafka sources Previously, the Debezium envelope only supported Avro encoding. This adds support for JSON-encoded Debezium messages, enabling ingestion from systems like TiCDC that emit Debezium-format JSON. The new `UpsertStyle::DebeziumJson` variant handles envelope extraction at render time by parsing the Jsonb datum to extract `op` and `after` fields. The output schema is `[key: jsonb, data: jsonb, ...metadata]`, where `key` is the decoded Kafka message key and `data` is the extracted `after` payload. Syntax: KEY FORMAT JSON VALUE FORMAT JSON ENVELOPE DEBEZIUM KEY FORMAT JSON VALUE FORMAT JSON ENVELOPE DEBEZIUM (MODE = 'TICDC') The optional MODE parameter selects dialect-specific handling (currently TiCDC vs generic). Both modes support the standard Debezium operations: create (c), update (u), read/snapshot (r), delete (d), and truncate (t). Payload-wrapped format (schema+payload) is also handled transparently. Tests added: - test/testdrive/kafka-debezium-json-sources.td (generic Debezium JSON) - test/testdrive/kafka-debezium-json-ticdc-sources.td (TiCDC mode) - Updated kafka-upsert-debezium-sources.td error message test Co-Authored-By: Claude Opus 4.6 --- src/catalog/src/memory/objects.rs | 3 +- src/sql-parser/src/ast/defs/ddl.rs | 20 ++- src/sql-parser/src/parser.rs | 16 +- src/sql-parser/tests/testdata/ddl | 18 +- src/sql/src/plan/statement/ddl.rs | 63 ++++--- src/sql/src/pure.rs | 4 +- src/storage-types/src/sources/envelope.rs | 37 ++++- src/storage/src/render/sources.rs | 116 ++++++++++++- test/testdrive/kafka-debezium-json-sources.td | 156 ++++++++++++++++++ .../kafka-debezium-json-ticdc-sources.td | 92 +++++++++++ .../kafka-upsert-debezium-sources.td | 4 +- 11 files changed, 487 insertions(+), 42 deletions(-) create mode 100644 test/testdrive/kafka-debezium-json-sources.td create mode 100644 test/testdrive/kafka-debezium-json-ticdc-sources.td diff --git a/src/catalog/src/memory/objects.rs b/src/catalog/src/memory/objects.rs index 26183da326fe6..00269f1c31e83 100644 --- a/src/catalog/src/memory/objects.rs +++ b/src/catalog/src/memory/objects.rs @@ -1033,7 +1033,8 @@ impl DataSourceDesc { SourceEnvelope::None(_) => "none", SourceEnvelope::Upsert(upsert_envelope) => match upsert_envelope.style { mz_storage_types::sources::envelope::UpsertStyle::Default(_) => "upsert", - mz_storage_types::sources::envelope::UpsertStyle::Debezium { .. } => { + mz_storage_types::sources::envelope::UpsertStyle::Debezium { .. } + | mz_storage_types::sources::envelope::UpsertStyle::DebeziumJson { .. } => { // NOTE(aljoscha): Should we somehow mark that this is // using upsert internally? See note above about // DEBEZIUM. diff --git a/src/sql-parser/src/ast/defs/ddl.rs b/src/sql-parser/src/ast/defs/ddl.rs index a43f41bd1e98b..1e8055a5096f1 100644 --- a/src/sql-parser/src/ast/defs/ddl.rs +++ b/src/sql-parser/src/ast/defs/ddl.rs @@ -633,10 +633,20 @@ impl AstDisplay for SourceErrorPolicy { } impl_display!(SourceErrorPolicy); +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub enum DebeziumMode { + /// Generic Debezium JSON/Avro format. + None, + /// TiCDC dialect of Debezium format. + TiCdc, +} + #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub enum SourceEnvelope { None, - Debezium, + Debezium { + mode: DebeziumMode, + }, Upsert { value_decode_err_policy: Vec, }, @@ -649,7 +659,7 @@ impl SourceEnvelope { pub fn requires_all_input(&self) -> bool { match self { SourceEnvelope::None => false, - SourceEnvelope::Debezium => false, + SourceEnvelope::Debezium { .. } => false, SourceEnvelope::Upsert { .. } => false, SourceEnvelope::CdcV2 => true, } @@ -663,8 +673,12 @@ impl AstDisplay for SourceEnvelope { // this is unreachable as long as the default is None, but include it in case we ever change that f.write_str("NONE"); } - Self::Debezium => { + Self::Debezium { mode } => { f.write_str("DEBEZIUM"); + match mode { + DebeziumMode::None => {} + DebeziumMode::TiCdc => f.write_str(" (MODE = 'TICDC')"), + } } Self::Upsert { value_decode_err_policy, diff --git a/src/sql-parser/src/parser.rs b/src/sql-parser/src/parser.rs index 1d5444d0b0c1d..852d0ea91622d 100644 --- a/src/sql-parser/src/parser.rs +++ b/src/sql-parser/src/parser.rs @@ -2363,7 +2363,21 @@ impl<'a> Parser<'a> { let envelope = if self.parse_keyword(NONE) { SourceEnvelope::None } else if self.parse_keyword(DEBEZIUM) { - SourceEnvelope::Debezium + let mode = if self.consume_token(&Token::LParen) { + self.expect_keyword(MODE)?; + self.expect_token(&Token::Eq)?; + let mode_str = self.parse_literal_string()?; + self.expect_token(&Token::RParen)?; + match mode_str.to_uppercase().as_str() { + "TICDC" => DebeziumMode::TiCdc, + _ => { + return self.expected(self.peek_pos(), "TICDC", self.peek_token()); + } + } + } else { + DebeziumMode::None + }; + SourceEnvelope::Debezium { mode } } else if self.parse_keyword(UPSERT) { let value_decode_err_policy = if self.consume_token(&Token::LParen) { // We only support the `VALUE DECODING ERRORS` option for now, but if we add another diff --git a/src/sql-parser/tests/testdata/ddl b/src/sql-parser/tests/testdata/ddl index 088b8061b5a86..1b07b30e98b1d 100644 --- a/src/sql-parser/tests/testdata/ddl +++ b/src/sql-parser/tests/testdata/ddl @@ -2511,7 +2511,7 @@ CREATE SOURCE src1 FROM KAFKA CONNECTION conn1 (TOPIC 'baz') FORMAT AVRO USING C ---- CREATE SOURCE src1 FROM KAFKA CONNECTION conn1 (TOPIC = 'baz') FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION conn2 ENVELOPE DEBEZIUM => -CreateSource(CreateSourceStatement { name: UnresolvedItemName([Ident("src1")]), in_cluster: None, col_names: [], connection: Kafka { connection: Name(UnresolvedItemName([Ident("conn1")])), options: [KafkaSourceConfigOption { name: Topic, value: Some(Value(String("baz"))) }] }, include_metadata: [], format: Some(Bare(Avro(Csr { csr_connection: CsrConnectionAvro { connection: CsrConnection { connection: Name(UnresolvedItemName([Ident("conn2")])), options: [] }, key_strategy: None, value_strategy: None, seed: None } }))), envelope: Some(Debezium), if_not_exists: false, key_constraint: None, with_options: [], external_references: None, progress_subsource: None }) +CreateSource(CreateSourceStatement { name: UnresolvedItemName([Ident("src1")]), in_cluster: None, col_names: [], connection: Kafka { connection: Name(UnresolvedItemName([Ident("conn1")])), options: [KafkaSourceConfigOption { name: Topic, value: Some(Value(String("baz"))) }] }, include_metadata: [], format: Some(Bare(Avro(Csr { csr_connection: CsrConnectionAvro { connection: CsrConnection { connection: Name(UnresolvedItemName([Ident("conn2")])), options: [] }, key_strategy: None, value_strategy: None, seed: None } }))), envelope: Some(Debezium { mode: None }), if_not_exists: false, key_constraint: None, with_options: [], external_references: None, progress_subsource: None }) parse-statement @@ -2519,22 +2519,22 @@ CREATE SOURCE src1 FROM KAFKA CONNECTION conn1 (TOPIC 'baz') FORMAT PROTOBUF USI ---- CREATE SOURCE src1 FROM KAFKA CONNECTION conn1 (TOPIC = 'baz') FORMAT PROTOBUF USING CONFLUENT SCHEMA REGISTRY CONNECTION conn2 ENVELOPE DEBEZIUM => -CreateSource(CreateSourceStatement { name: UnresolvedItemName([Ident("src1")]), in_cluster: None, col_names: [], connection: Kafka { connection: Name(UnresolvedItemName([Ident("conn1")])), options: [KafkaSourceConfigOption { name: Topic, value: Some(Value(String("baz"))) }] }, include_metadata: [], format: Some(Bare(Protobuf(Csr { csr_connection: CsrConnectionProtobuf { connection: CsrConnection { connection: Name(UnresolvedItemName([Ident("conn2")])), options: [] }, seed: None } }))), envelope: Some(Debezium), if_not_exists: false, key_constraint: None, with_options: [], external_references: None, progress_subsource: None }) +CreateSource(CreateSourceStatement { name: UnresolvedItemName([Ident("src1")]), in_cluster: None, col_names: [], connection: Kafka { connection: Name(UnresolvedItemName([Ident("conn1")])), options: [KafkaSourceConfigOption { name: Topic, value: Some(Value(String("baz"))) }] }, include_metadata: [], format: Some(Bare(Protobuf(Csr { csr_connection: CsrConnectionProtobuf { connection: CsrConnection { connection: Name(UnresolvedItemName([Ident("conn2")])), options: [] }, seed: None } }))), envelope: Some(Debezium { mode: None }), if_not_exists: false, key_constraint: None, with_options: [], external_references: None, progress_subsource: None }) parse-statement CREATE SOURCE src1 FROM KAFKA CONNECTION conn1 (TOPIC 'baz') ENVELOPE DEBEZIUM (TRANSACTION METADATA (SOURCE a.b.c, COLLECTION 'foo')) ---- -error: Expected end of statement, found left parenthesis +error: Expected MODE, found TRANSACTION CREATE SOURCE src1 FROM KAFKA CONNECTION conn1 (TOPIC 'baz') ENVELOPE DEBEZIUM (TRANSACTION METADATA (SOURCE a.b.c, COLLECTION 'foo')) - ^ + ^ parse-statement CREATE SOURCE src1 FROM KAFKA CONNECTION conn1 (TOPIC 'baz') ENVELOPE DEBEZIUM (TRANSACTION METADATA (COLLECTION 'foo', SOURCE a.b.c)) ---- -error: Expected end of statement, found left parenthesis +error: Expected MODE, found TRANSACTION CREATE SOURCE src1 FROM KAFKA CONNECTION conn1 (TOPIC 'baz') ENVELOPE DEBEZIUM (TRANSACTION METADATA (COLLECTION 'foo', SOURCE a.b.c)) - ^ + ^ # Note that this will error in planning, as you cannot specify START OFFSET and START TIMESTAMP at the same time parse-statement @@ -2542,7 +2542,7 @@ CREATE SOURCE src1 FROM KAFKA CONNECTION conn1 (START OFFSET=1, START TIMESTAMP= ---- CREATE SOURCE src1 FROM KAFKA CONNECTION conn1 (START OFFSET = 1, START TIMESTAMP = 2, TOPIC = 'baz') ENVELOPE DEBEZIUM => -CreateSource(CreateSourceStatement { name: UnresolvedItemName([Ident("src1")]), in_cluster: None, col_names: [], connection: Kafka { connection: Name(UnresolvedItemName([Ident("conn1")])), options: [KafkaSourceConfigOption { name: StartOffset, value: Some(Value(Number("1"))) }, KafkaSourceConfigOption { name: StartTimestamp, value: Some(Value(Number("2"))) }, KafkaSourceConfigOption { name: Topic, value: Some(Value(String("baz"))) }] }, include_metadata: [], format: None, envelope: Some(Debezium), if_not_exists: false, key_constraint: None, with_options: [], external_references: None, progress_subsource: None }) +CreateSource(CreateSourceStatement { name: UnresolvedItemName([Ident("src1")]), in_cluster: None, col_names: [], connection: Kafka { connection: Name(UnresolvedItemName([Ident("conn1")])), options: [KafkaSourceConfigOption { name: StartOffset, value: Some(Value(Number("1"))) }, KafkaSourceConfigOption { name: StartTimestamp, value: Some(Value(Number("2"))) }, KafkaSourceConfigOption { name: Topic, value: Some(Value(String("baz"))) }] }, include_metadata: [], format: None, envelope: Some(Debezium { mode: None }), if_not_exists: false, key_constraint: None, with_options: [], external_references: None, progress_subsource: None }) # Note that this will error in planning, as START OFFSET must be an array of nums parse-statement @@ -2585,7 +2585,7 @@ CREATE SOURCE src1 FROM KAFKA CONNECTION conn1 (TOPIC 'baz') FORMAT PROTOBUF USI ---- CREATE SOURCE src1 FROM KAFKA CONNECTION conn1 (TOPIC = 'baz') FORMAT PROTOBUF USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn SEED VALUE SCHEMA '{"some": "seed"}' MESSAGE 'Batch' ENVELOPE DEBEZIUM => -CreateSource(CreateSourceStatement { name: UnresolvedItemName([Ident("src1")]), in_cluster: None, col_names: [], connection: Kafka { connection: Name(UnresolvedItemName([Ident("conn1")])), options: [KafkaSourceConfigOption { name: Topic, value: Some(Value(String("baz"))) }] }, include_metadata: [], format: Some(Bare(Protobuf(Csr { csr_connection: CsrConnectionProtobuf { connection: CsrConnection { connection: Name(UnresolvedItemName([Ident("csr_conn")])), options: [] }, seed: Some(CsrSeedProtobuf { key: None, value: CsrSeedProtobufSchema { schema: "{\"some\": \"seed\"}", message_name: "Batch" } }) } }))), envelope: Some(Debezium), if_not_exists: false, key_constraint: None, with_options: [], external_references: None, progress_subsource: None }) +CreateSource(CreateSourceStatement { name: UnresolvedItemName([Ident("src1")]), in_cluster: None, col_names: [], connection: Kafka { connection: Name(UnresolvedItemName([Ident("conn1")])), options: [KafkaSourceConfigOption { name: Topic, value: Some(Value(String("baz"))) }] }, include_metadata: [], format: Some(Bare(Protobuf(Csr { csr_connection: CsrConnectionProtobuf { connection: CsrConnection { connection: Name(UnresolvedItemName([Ident("csr_conn")])), options: [] }, seed: Some(CsrSeedProtobuf { key: None, value: CsrSeedProtobufSchema { schema: "{\"some\": \"seed\"}", message_name: "Batch" } }) } }))), envelope: Some(Debezium { mode: None }), if_not_exists: false, key_constraint: None, with_options: [], external_references: None, progress_subsource: None }) parse-statement @@ -2593,7 +2593,7 @@ CREATE SOURCE src1 FROM KAFKA CONNECTION conn1 (TOPIC 'baz') FORMAT PROTOBUF USI ---- CREATE SOURCE src1 FROM KAFKA CONNECTION conn1 (TOPIC = 'baz') FORMAT PROTOBUF USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn SEED KEY SCHEMA '{"some": "seed"}' MESSAGE 'Batch' VALUE SCHEMA '123' MESSAGE 'M' ENVELOPE DEBEZIUM => -CreateSource(CreateSourceStatement { name: UnresolvedItemName([Ident("src1")]), in_cluster: None, col_names: [], connection: Kafka { connection: Name(UnresolvedItemName([Ident("conn1")])), options: [KafkaSourceConfigOption { name: Topic, value: Some(Value(String("baz"))) }] }, include_metadata: [], format: Some(Bare(Protobuf(Csr { csr_connection: CsrConnectionProtobuf { connection: CsrConnection { connection: Name(UnresolvedItemName([Ident("csr_conn")])), options: [] }, seed: Some(CsrSeedProtobuf { key: Some(CsrSeedProtobufSchema { schema: "{\"some\": \"seed\"}", message_name: "Batch" }), value: CsrSeedProtobufSchema { schema: "123", message_name: "M" } }) } }))), envelope: Some(Debezium), if_not_exists: false, key_constraint: None, with_options: [], external_references: None, progress_subsource: None }) +CreateSource(CreateSourceStatement { name: UnresolvedItemName([Ident("src1")]), in_cluster: None, col_names: [], connection: Kafka { connection: Name(UnresolvedItemName([Ident("conn1")])), options: [KafkaSourceConfigOption { name: Topic, value: Some(Value(String("baz"))) }] }, include_metadata: [], format: Some(Bare(Protobuf(Csr { csr_connection: CsrConnectionProtobuf { connection: CsrConnection { connection: Name(UnresolvedItemName([Ident("csr_conn")])), options: [] }, seed: Some(CsrSeedProtobuf { key: Some(CsrSeedProtobufSchema { schema: "{\"some\": \"seed\"}", message_name: "Batch" }), value: CsrSeedProtobufSchema { schema: "123", message_name: "M" } }) } }))), envelope: Some(Debezium { mode: None }), if_not_exists: false, key_constraint: None, with_options: [], external_references: None, progress_subsource: None }) parse-statement CREATE SOURCE src1 FROM KAFKA CONNECTION conn1 (TOPIC 'baz') FORMAT PROTOBUF USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn SEED KEY SCHEMA '{"some": "seed"}' MESSAGE 'Batch' VALUE SCHEMA '123' MESSAGE 'M' ENVELOPE UPSERT diff --git a/src/sql/src/plan/statement/ddl.rs b/src/sql/src/plan/statement/ddl.rs index 0343f1c676584..516d607f45a42 100644 --- a/src/sql/src/plan/statement/ddl.rs +++ b/src/sql/src/plan/statement/ddl.rs @@ -95,7 +95,8 @@ use mz_storage_types::sources::encoding::{ SourceDataEncoding, included_column_desc, }; use mz_storage_types::sources::envelope::{ - KeyEnvelope, NoneEnvelope, SourceEnvelope, UnplannedSourceEnvelope, UpsertStyle, + DebeziumJsonMode, KeyEnvelope, NoneEnvelope, SourceEnvelope, UnplannedSourceEnvelope, + UpsertStyle, }; use mz_storage_types::sources::kafka::{ KafkaMetadataKind, KafkaSourceConnection, KafkaSourceExportDetails, kafka_metadata_columns_desc, @@ -812,7 +813,7 @@ pub fn plan_create_source( envelope, ast::SourceEnvelope::Upsert { .. } | ast::SourceEnvelope::None - | ast::SourceEnvelope::Debezium + | ast::SourceEnvelope::Debezium { .. } ) { sql_bail!("INCLUDE requires ENVELOPE (NONE|UPSERT|DEBEZIUM)"); @@ -1375,9 +1376,17 @@ fn apply_source_envelope_encoding( key_envelope_no_encoding, )?; - match (&envelope, &key_envelope) { - (ast::SourceEnvelope::Debezium, KeyEnvelope::None) => {} - (ast::SourceEnvelope::Debezium, _) => sql_bail!( + match ( + &envelope, + &key_envelope, + encoding.as_ref().map(|e| &e.value), + ) { + // Avro Debezium: keys are in the unpacked `after` record, reject INCLUDE KEY + (ast::SourceEnvelope::Debezium { .. }, KeyEnvelope::None, _) => {} + (ast::SourceEnvelope::Debezium { .. }, _, Some(DataEncoding::Json)) => { + // JSON Debezium: key is included as a separate column, INCLUDE KEY is fine + } + (ast::SourceEnvelope::Debezium { .. }, _, _) => sql_bail!( "Cannot use INCLUDE KEY with ENVELOPE DEBEZIUM: Debezium values include all keys." ), _ => {} @@ -1394,20 +1403,34 @@ fn apply_source_envelope_encoding( let envelope = match &envelope { // TODO: fixup key envelope ast::SourceEnvelope::None => UnplannedSourceEnvelope::None(key_envelope), - ast::SourceEnvelope::Debezium => { - //TODO check that key envelope is not set - let after_idx = match typecheck_debezium(&value_desc) { - Ok((_before_idx, after_idx)) => Ok(after_idx), - Err(type_err) => match encoding.as_ref().map(|e| &e.value) { - Some(DataEncoding::Avro(_)) => Err(type_err), - _ => Err(sql_err!( - "ENVELOPE DEBEZIUM requires that VALUE FORMAT is set to AVRO" - )), - }, - }?; + ast::SourceEnvelope::Debezium { mode } => { + match encoding.as_ref().map(|e| &e.value) { + Some(DataEncoding::Json) => { + // JSON path: skip typecheck_debezium, use DebeziumJson style + let storage_mode = match mode { + ast::DebeziumMode::TiCdc => DebeziumJsonMode::TiCdc, + ast::DebeziumMode::None => DebeziumJsonMode::Generic, + }; + UnplannedSourceEnvelope::Upsert { + style: UpsertStyle::DebeziumJson { mode: storage_mode }, + } + } + _ => { + // Existing Avro path + let after_idx = match typecheck_debezium(&value_desc) { + Ok((_before_idx, after_idx)) => Ok(after_idx), + Err(type_err) => match encoding.as_ref().map(|e| &e.value) { + Some(DataEncoding::Avro(_)) => Err(type_err), + _ => Err(sql_err!( + "ENVELOPE DEBEZIUM requires that VALUE FORMAT is set to AVRO or JSON" + )), + }, + }?; - UnplannedSourceEnvelope::Upsert { - style: UpsertStyle::Debezium { after_idx }, + UnplannedSourceEnvelope::Upsert { + style: UpsertStyle::Debezium { after_idx }, + } + } } } ast::SourceEnvelope::Upsert { @@ -1847,7 +1870,7 @@ pub fn plan_create_table_from_source( envelope, ast::SourceEnvelope::Upsert { .. } | ast::SourceEnvelope::None - | ast::SourceEnvelope::Debezium + | ast::SourceEnvelope::Debezium { .. } ) { // TODO(guswynn): should this be `bail_unsupported!`? @@ -2247,7 +2270,7 @@ fn get_encoding( let requires_keyvalue = matches!( envelope, - ast::SourceEnvelope::Debezium | ast::SourceEnvelope::Upsert { .. } + ast::SourceEnvelope::Debezium { .. } | ast::SourceEnvelope::Upsert { .. } ); let is_keyvalue = encoding.key.is_some(); if requires_keyvalue && !is_keyvalue { diff --git a/src/sql/src/pure.rs b/src/sql/src/pure.rs index 44488fa966a89..078ab2e9d7139 100644 --- a/src/sql/src/pure.rs +++ b/src/sql/src/pure.rs @@ -2413,7 +2413,7 @@ async fn purify_csr_connection_proto( .await .ok(); - if matches!(envelope, Some(SourceEnvelope::Debezium)) && key.is_none() { + if matches!(envelope, Some(SourceEnvelope::Debezium { .. })) && key.is_none() { sql_bail!("Key schema is required for ENVELOPE DEBEZIUM"); } @@ -2465,7 +2465,7 @@ async fn purify_csr_connection_avro( topic, ) .await?; - if matches!(envelope, Some(SourceEnvelope::Debezium)) && key_schema.is_none() { + if matches!(envelope, Some(SourceEnvelope::Debezium { .. })) && key_schema.is_none() { sql_bail!("Key schema is required for ENVELOPE DEBEZIUM"); } diff --git a/src/storage-types/src/sources/envelope.rs b/src/storage-types/src/sources/envelope.rs index 3eabec1ce7d10..66acf6b835ddb 100644 --- a/src/storage-types/src/sources/envelope.rs +++ b/src/storage-types/src/sources/envelope.rs @@ -67,13 +67,25 @@ pub struct UpsertEnvelope { pub key_indices: Vec, } +/// Mode for JSON Debezium envelope processing. +#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] +pub enum DebeziumJsonMode { + /// Generic Debezium JSON format. + Generic, + /// TiCDC dialect of Debezium format. + TiCdc, +} + #[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] pub enum UpsertStyle { /// `ENVELOPE UPSERT`, where the key shape depends on the independent /// `KeyEnvelope` Default(KeyEnvelope), - /// `ENVELOPE DEBEZIUM UPSERT` + /// `ENVELOPE DEBEZIUM UPSERT` with Avro encoding Debezium { after_idx: usize }, + /// `ENVELOPE DEBEZIUM` with JSON encoding. Envelope extraction happens at + /// render time by parsing the Jsonb datum to extract `op` and `after` fields. + DebeziumJson { mode: DebeziumJsonMode }, /// `ENVELOPE UPSERT` where any decode errors will get serialized into a /// SqlScalarType::Record column named `error_column`, and all value columns are /// nullable. The key shape depends on the independent `KeyEnvelope`. @@ -227,6 +239,29 @@ impl UnplannedSourceEnvelope { desc, ) } + UnplannedSourceEnvelope::Upsert { + style: UpsertStyle::DebeziumJson { .. }, + .. + } => { + // JSON Debezium: output is [key: Jsonb, data: Jsonb, ...metadata] + // Key is included as a separate column (unlike Avro Debezium). + let key_desc = RelationDesc::builder() + .with_column("key", SqlScalarType::Jsonb.nullable(false)) + .finish(); + let value_desc = RelationDesc::builder() + .with_column("data", SqlScalarType::Jsonb.nullable(false)) + .finish(); + // key_indices = [0]: points to the key column for UpsertKey::from_value rehydration + let key_indices = vec![0usize]; + let desc = key_desc + .with_key(key_indices.clone()) + .concat(value_desc) + .concat(metadata_desc); + ( + self.into_source_envelope(Some(key_indices), None, Some(desc.arity())), + desc, + ) + } UnplannedSourceEnvelope::Upsert { style: UpsertStyle::Debezium { after_idx }, .. diff --git a/src/storage/src/render/sources.rs b/src/storage/src/render/sources.rs index f2be72edab714..9b27eb93bd633 100644 --- a/src/storage/src/render/sources.rs +++ b/src/storage/src/render/sources.rs @@ -24,10 +24,13 @@ use mz_storage_operators::persist_source::Subtime; use mz_storage_types::controller::CollectionMetadata; use mz_storage_types::dyncfgs; use mz_storage_types::errors::{ - DataflowError, DecodeError, EnvelopeError, UpsertError, UpsertNullKeyError, UpsertValueError, + DataflowError, DecodeError, DecodeErrorKind, EnvelopeError, UpsertError, UpsertNullKeyError, + UpsertValueError, }; use mz_storage_types::parameters::StorageMaxInflightBytesConfig; -use mz_storage_types::sources::envelope::{KeyEnvelope, NoneEnvelope, UpsertEnvelope, UpsertStyle}; +use mz_storage_types::sources::envelope::{ + DebeziumJsonMode, KeyEnvelope, NoneEnvelope, UpsertEnvelope, UpsertStyle, +}; use mz_storage_types::sources::*; use mz_timely_util::builder_async::PressOnDropButton; use mz_timely_util::operator::CollectionExt; @@ -515,8 +518,9 @@ fn upsert_commands( // We can now apply the key envelope let key_row = match upsert_envelope.style { - // flattened or debezium + // flattened, debezium (avro or json) UpsertStyle::Debezium { .. } + | UpsertStyle::DebeziumJson { .. } | UpsertStyle::Default(KeyEnvelope::Flattened) | UpsertStyle::ValueErrInline { key_envelope: KeyEnvelope::Flattened, @@ -548,6 +552,23 @@ fn upsert_commands( let value = match result.value { Some(Ok(ref row)) => match upsert_envelope.style { + UpsertStyle::DebeziumJson { ref mode } => { + match extract_debezium_json(row, mode) { + Ok(Some(after_row)) => { + let mut packer = row_buf.packer(); + // Per DD-9: prepend key, then after payload, then metadata + packer.extend_by_row(&key_row); + packer.extend_by_row(&after_row); + packer.extend_by_row(&metadata); + Some(Ok(row_buf.clone())) + } + Ok(None) => None, // Delete or truncate + Err(err) => Some(Err(Box::new(UpsertError::Value(UpsertValueError { + for_key: key_row.clone(), + inner: err, + })))), + } + } UpsertStyle::Debezium { after_idx } => match row.iter().nth(after_idx).unwrap() { Datum::List(after) => { let mut packer = row_buf.packer(); @@ -611,6 +632,95 @@ fn upsert_commands( }) } +/// Extract the `after` payload from a Debezium JSON envelope message. +/// +/// Handles both `payload`-wrapped (`{ "payload": { "op": ..., "after": ... } }`) +/// and flat (`{ "op": ..., "after": ... }`) formats. +/// +/// Returns: +/// - `Ok(Some(row))` for insert/update/snapshot operations (op = c/u/r) +/// - `Ok(None)` for delete/truncate operations (op = d/t) +/// - `Err(DecodeError)` for malformed messages +fn extract_debezium_json(row: &Row, _mode: &DebeziumJsonMode) -> Result, DecodeError> { + use mz_repr::adt::jsonb::JsonbRef; + + let datum = row.iter().next().unwrap(); + if datum == Datum::JsonNull { + // Null value treated as delete (tombstone) + return Ok(None); + } + + // Wrap the datum as a JsonbRef and convert to serde_json::Value for field access + let jsonb = JsonbRef::from_datum(datum); + let json_value = jsonb.to_serde_json(); + + // Handle payload-wrapped vs flat format + let envelope = match json_value.get("payload") { + Some(payload) if payload.is_object() => payload, + _ => &json_value, + }; + + // Extract "op" field + let op = match envelope.get("op").and_then(|v| v.as_str()) { + Some(op) => op, + None => { + return Err(DecodeError { + kind: DecodeErrorKind::Text("Debezium JSON message missing 'op' field".into()), + raw: vec![], + }); + } + }; + + match op { + "d" => Ok(None), + "t" => { + // Truncate: silently skip per DD-13 + tracing::warn!("Debezium JSON: ignoring truncate event (op='t')"); + Ok(None) + } + "c" | "u" | "r" => { + // Create, update, or read (snapshot) — extract "after" field + let after = match envelope.get("after") { + Some(serde_json::Value::Null) | None => { + return Err(DecodeError { + kind: DecodeErrorKind::Text( + format!( + "Debezium JSON message with op='{}' has null or missing 'after' field", + op + ) + .into(), + ), + raw: vec![], + }); + } + Some(after) => after, + }; + + // Re-serialize "after" into a Row with a single Jsonb datum + let after_json = + mz_repr::adt::jsonb::Jsonb::from_serde_json(after.clone()).map_err(|e| { + DecodeError { + kind: DecodeErrorKind::Text( + format!("Failed to convert Debezium 'after' to Jsonb: {}", e).into(), + ), + raw: vec![], + } + })?; + Ok(Some(after_json.into_row())) + } + other => Err(DecodeError { + kind: DecodeErrorKind::Text( + format!( + "Debezium JSON message has unrecognized 'op' value: '{}'", + other + ) + .into(), + ), + raw: vec![], + }), + } +} + /// Convert from streams of [`DecodeResult`] to Rows, inserting the Key according to [`KeyEnvelope`] fn flatten_results_prepend_keys( none_envelope: &NoneEnvelope, diff --git a/test/testdrive/kafka-debezium-json-sources.td b/test/testdrive/kafka-debezium-json-sources.td new file mode 100644 index 0000000000000..3e177bac3d4d0 --- /dev/null +++ b/test/testdrive/kafka-debezium-json-sources.td @@ -0,0 +1,156 @@ +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. + +$ set-sql-timeout duration=60s + +$ set-arg-default default-storage-size=scale=1,workers=1 + +# ============================================================================= +# Test Debezium JSON envelope support (generic mode) +# ============================================================================= + +> CREATE CONNECTION kafka_conn + TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT); + +# --- Basic CRUD operations --- + +$ kafka-create-topic topic=dbz-json partitions=1 + +# Use simple integer keys to avoid key-terminator conflicts with JSON colons +$ kafka-ingest format=bytes key-format=bytes key-terminator=: topic=dbz-json +1:{"before":null,"after":{"id":1,"creature":"mudskipper"},"op":"c"} +1:{"before":{"id":1,"creature":"mudskipper"},"after":{"id":1,"creature":"salamander"},"op":"u"} +1:{"before":{"id":1,"creature":"salamander"},"after":{"id":1,"creature":"lizard"},"op":"u"} + +> CREATE CLUSTER dbz_json_cluster SIZE '${arg.default-storage-size}'; +> BEGIN +> CREATE SOURCE dbz_json_src + IN CLUSTER dbz_json_cluster + FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-dbz-json-${testdrive.seed}'); + +> CREATE TABLE dbz_json_tbl FROM SOURCE dbz_json_src (REFERENCE "testdrive-dbz-json-${testdrive.seed}") + KEY FORMAT JSON VALUE FORMAT JSON + ENVELOPE DEBEZIUM +> COMMIT + +# Output schema is [key: jsonb, data: jsonb] +# The last update for key=1 should win (lizard) +# Use ->> to extract fields since JSON key order is non-deterministic +> SELECT key, data->>'id', data->>'creature' FROM dbz_json_tbl +1 1 lizard + +# --- Insert a second row --- + +$ kafka-ingest format=bytes key-format=bytes key-terminator=: topic=dbz-json +2:{"before":null,"after":{"id":2,"creature":"archeopteryx"},"op":"c"} +2:{"before":{"id":2,"creature":"archeopteryx"},"after":{"id":2,"creature":"velociraptor"},"op":"u"} + +> SELECT key, data->>'creature' FROM dbz_json_tbl ORDER BY key +1 lizard +2 velociraptor + +# --- Test delete (op=d) --- + +$ kafka-ingest format=bytes key-format=bytes key-terminator=: topic=dbz-json +1:{"before":{"id":1,"creature":"lizard"},"after":null,"op":"d"} + +> SELECT key, data->>'creature' FROM dbz_json_tbl +2 velociraptor + +# --- Test reinsertion after delete --- + +$ kafka-ingest format=bytes key-format=bytes key-terminator=: topic=dbz-json +1:{"before":null,"after":{"id":1,"creature":"chicken"},"op":"c"} + +> SELECT key, data->>'creature' FROM dbz_json_tbl ORDER BY key +1 chicken +2 velociraptor + +# --- Test snapshot read (op=r) --- + +$ kafka-create-topic topic=dbz-json-snapshot partitions=1 + +$ kafka-ingest format=bytes key-format=bytes key-terminator=: topic=dbz-json-snapshot +10:{"before":null,"after":{"id":10,"name":"snapshot_row"},"op":"r"} + +> BEGIN +> CREATE SOURCE dbz_json_snap_src + IN CLUSTER dbz_json_cluster + FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-dbz-json-snapshot-${testdrive.seed}'); + +> CREATE TABLE dbz_json_snap_tbl FROM SOURCE dbz_json_snap_src (REFERENCE "testdrive-dbz-json-snapshot-${testdrive.seed}") + KEY FORMAT JSON VALUE FORMAT JSON + ENVELOPE DEBEZIUM +> COMMIT + +> SELECT key, data->>'name' FROM dbz_json_snap_tbl +10 snapshot_row + +# --- Test payload-wrapped format (Debezium default with schema+payload) --- + +$ kafka-create-topic topic=dbz-json-wrapped partitions=1 + +$ kafka-ingest format=bytes key-format=bytes key-terminator=: topic=dbz-json-wrapped +100:{"schema":{"type":"struct","fields":[]},"payload":{"before":null,"after":{"id":100,"val":"wrapped"},"op":"c"}} + +> BEGIN +> CREATE SOURCE dbz_json_wrapped_src + IN CLUSTER dbz_json_cluster + FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-dbz-json-wrapped-${testdrive.seed}'); + +> CREATE TABLE dbz_json_wrapped_tbl FROM SOURCE dbz_json_wrapped_src (REFERENCE "testdrive-dbz-json-wrapped-${testdrive.seed}") + KEY FORMAT JSON VALUE FORMAT JSON + ENVELOPE DEBEZIUM +> COMMIT + +> SELECT key, data->>'val' FROM dbz_json_wrapped_tbl +100 wrapped + +# --- Test INCLUDE OFFSET metadata --- + +$ kafka-create-topic topic=dbz-json-meta partitions=1 + +$ kafka-ingest format=bytes key-format=bytes key-terminator=: topic=dbz-json-meta +1:{"before":null,"after":{"id":1,"val":"first"},"op":"c"} +2:{"before":null,"after":{"id":2,"val":"second"},"op":"c"} + +> BEGIN +> CREATE SOURCE dbz_json_meta_src + IN CLUSTER dbz_json_cluster + FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-dbz-json-meta-${testdrive.seed}'); + +> CREATE TABLE dbz_json_meta_tbl FROM SOURCE dbz_json_meta_src (REFERENCE "testdrive-dbz-json-meta-${testdrive.seed}") + KEY FORMAT JSON VALUE FORMAT JSON + INCLUDE OFFSET + ENVELOPE DEBEZIUM +> COMMIT + +> SELECT key, data->>'val', "offset" FROM dbz_json_meta_tbl ORDER BY key +1 first 0 +2 second 1 + +# --- Error case: missing KEY FORMAT --- + +$ kafka-create-topic topic=dbz-json-nokey partitions=1 + +> CREATE SOURCE dbz_json_nokey_src + IN CLUSTER dbz_json_cluster + FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-dbz-json-nokey-${testdrive.seed}'); + +! CREATE TABLE dbz_json_nokey_tbl FROM SOURCE dbz_json_nokey_src (REFERENCE "testdrive-dbz-json-nokey-${testdrive.seed}") + FORMAT JSON + ENVELOPE DEBEZIUM +contains:ENVELOPE [DEBEZIUM] UPSERT requires that KEY FORMAT be specified + +# --- Error case: non-JSON/non-Avro format --- + +! CREATE TABLE dbz_json_nokey_tbl FROM SOURCE dbz_json_nokey_src (REFERENCE "testdrive-dbz-json-nokey-${testdrive.seed}") + KEY FORMAT TEXT VALUE FORMAT TEXT + ENVELOPE DEBEZIUM +contains:ENVELOPE DEBEZIUM requires that VALUE FORMAT is set to AVRO or JSON diff --git a/test/testdrive/kafka-debezium-json-ticdc-sources.td b/test/testdrive/kafka-debezium-json-ticdc-sources.td new file mode 100644 index 0000000000000..5c4d08396f1e3 --- /dev/null +++ b/test/testdrive/kafka-debezium-json-ticdc-sources.td @@ -0,0 +1,92 @@ +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. + +$ set-sql-timeout duration=60s + +$ set-arg-default default-storage-size=scale=1,workers=1 + +# ============================================================================= +# Test Debezium JSON envelope support with TiCDC mode +# TiCDC uses the same Debezium format but may include extra fields like +# commit_ts, cluster_id, etc. +# ============================================================================= + +> CREATE CONNECTION kafka_conn + TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT); + +# --- Basic TiCDC CRUD operations --- + +$ kafka-create-topic topic=dbz-ticdc partitions=1 + +# TiCDC messages include additional TiDB-specific fields +# Use simple integer keys to avoid key-terminator conflicts with JSON colons +$ kafka-ingest format=bytes key-format=bytes key-terminator=: topic=dbz-ticdc +1:{"before":null,"after":{"id":1,"name":"alice"},"op":"c","source":{"version":"1.0","connector":"tidb","name":"test","commit_ts":1000},"ts_ms":1609459200000} +2:{"before":null,"after":{"id":2,"name":"bob"},"op":"c","source":{"version":"1.0","connector":"tidb","name":"test","commit_ts":1001},"ts_ms":1609459200001} +1:{"before":{"id":1,"name":"alice"},"after":{"id":1,"name":"alice_updated"},"op":"u","source":{"version":"1.0","connector":"tidb","name":"test","commit_ts":1002},"ts_ms":1609459200002} + +> CREATE CLUSTER dbz_ticdc_cluster SIZE '${arg.default-storage-size}'; +> BEGIN +> CREATE SOURCE dbz_ticdc_src + IN CLUSTER dbz_ticdc_cluster + FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-dbz-ticdc-${testdrive.seed}'); + +> CREATE TABLE dbz_ticdc_tbl FROM SOURCE dbz_ticdc_src (REFERENCE "testdrive-dbz-ticdc-${testdrive.seed}") + KEY FORMAT JSON VALUE FORMAT JSON + ENVELOPE DEBEZIUM (MODE = 'TICDC') +> COMMIT + +> SELECT key, data->>'name' FROM dbz_ticdc_tbl ORDER BY key +1 alice_updated +2 bob + +# --- Test delete --- + +$ kafka-ingest format=bytes key-format=bytes key-terminator=: topic=dbz-ticdc +2:{"before":{"id":2,"name":"bob"},"after":null,"op":"d","source":{"version":"1.0","connector":"tidb","name":"test","commit_ts":1003},"ts_ms":1609459200003} + +> SELECT key, data->>'name' FROM dbz_ticdc_tbl +1 alice_updated + +# --- Test reinsertion --- + +$ kafka-ingest format=bytes key-format=bytes key-terminator=: topic=dbz-ticdc +2:{"before":null,"after":{"id":2,"name":"bob_v2"},"op":"c","source":{"version":"1.0","connector":"tidb","name":"test","commit_ts":1004},"ts_ms":1609459200004} + +> SELECT key, data->>'name' FROM dbz_ticdc_tbl ORDER BY key +1 alice_updated +2 bob_v2 + +# --- Verify MODE syntax parsing --- + +# MODE parameter is case-insensitive +> BEGIN +> CREATE SOURCE dbz_ticdc_upper_src + IN CLUSTER dbz_ticdc_cluster + FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-dbz-ticdc-${testdrive.seed}'); + +> CREATE TABLE dbz_ticdc_upper_tbl FROM SOURCE dbz_ticdc_upper_src (REFERENCE "testdrive-dbz-ticdc-${testdrive.seed}") + KEY FORMAT JSON VALUE FORMAT JSON + ENVELOPE DEBEZIUM (MODE = 'ticdc') +> COMMIT + +> SELECT key, data->>'name' FROM dbz_ticdc_upper_tbl ORDER BY key +1 alice_updated +2 bob_v2 + +# --- Error case: invalid MODE value --- + +> CREATE SOURCE dbz_ticdc_bad_src + IN CLUSTER dbz_ticdc_cluster + FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-dbz-ticdc-${testdrive.seed}'); + +! CREATE TABLE dbz_ticdc_bad_tbl FROM SOURCE dbz_ticdc_bad_src (REFERENCE "testdrive-dbz-ticdc-${testdrive.seed}") + KEY FORMAT JSON VALUE FORMAT JSON + ENVELOPE DEBEZIUM (MODE = 'INVALID') +contains:Expected TICDC diff --git a/test/testdrive/kafka-upsert-debezium-sources.td b/test/testdrive/kafka-upsert-debezium-sources.td index 578738840abfd..44780202c4efc 100644 --- a/test/testdrive/kafka-upsert-debezium-sources.td +++ b/test/testdrive/kafka-upsert-debezium-sources.td @@ -108,9 +108,9 @@ $ kafka-ingest format=avro topic=dbzupsert key-format=avro key-schema=${keyschem contains:ENVELOPE [DEBEZIUM] UPSERT requires that KEY FORMAT be specified ! CREATE TABLE doin_upsert_tbl FROM SOURCE doin_upsert (REFERENCE "testdrive-dbzupsert-${testdrive.seed}") - KEY FORMAT JSON VALUE FORMAT JSON + KEY FORMAT TEXT VALUE FORMAT TEXT ENVELOPE DEBEZIUM -contains:ENVELOPE DEBEZIUM requires that VALUE FORMAT is set to AVRO +contains:ENVELOPE DEBEZIUM requires that VALUE FORMAT is set to AVRO or JSON > CREATE TABLE doin_upsert_tbl FROM SOURCE doin_upsert (REFERENCE "testdrive-dbzupsert-${testdrive.seed}") FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn From b4e5c39dcf59f747258345e43e3db97f965dbcf0 Mon Sep 17 00:00:00 2001 From: Jon Currey Date: Thu, 5 Mar 2026 22:29:52 -0500 Subject: [PATCH 2/5] doc: Add tech spec for Debezium JSON envelope sources Co-Authored-By: Claude Opus 4.6 --- ...20260305_debezium_json_envelope_sources.md | 465 ++++++++++++++++++ 1 file changed, 465 insertions(+) create mode 100644 doc/developer/spec/20260305_debezium_json_envelope_sources.md diff --git a/doc/developer/spec/20260305_debezium_json_envelope_sources.md b/doc/developer/spec/20260305_debezium_json_envelope_sources.md new file mode 100644 index 0000000000000..76d74fda85650 --- /dev/null +++ b/doc/developer/spec/20260305_debezium_json_envelope_sources.md @@ -0,0 +1,465 @@ +Created using https://github.com/bmad-code-org/BMAD-METHOD +--- +title: 'Debezium JSON Envelope Support for Sources' +slug: 'debezium-json-envelope-sources' +created: '2026-03-05' +status: 'ready-for-dev' +stepsCompleted: [1, 2, 3, 4] +tech_stack: [rust, kafka, json, debezium, serde_json, mz_repr, mz_interchange, mz_storage, mz_sql, mz_sql_parser, mz_storage_types] +files_to_modify: + - src/sql-parser/src/ast/defs/ddl.rs + - src/sql-parser/src/parser.rs + - src/sql/src/plan/statement/ddl.rs + - src/storage-types/src/sources/envelope.rs + - src/storage/src/render/sources.rs + - test/testdrive/kafka-debezium-json-sources.td +code_patterns: + - UpsertStyle enum variant for envelope style selection + - typecheck_debezium for planning-time validation + - upsert_commands match arm for render-time extraction + - PreDelimitedFormat::Json for JSON byte decoding + - DecodeResult { key, value, metadata } pipeline +test_patterns: + - testdrive with kafka-ingest format=bytes for JSON payloads + - key-value Kafka messages with separate key and value + - SELECT verification of final materialized output + - Insert/Update/Delete operation coverage +--- + +# Tech-Spec: Debezium JSON Envelope Support for Sources + +**Created:** 2026-03-05 + +## Overview + +### Problem Statement + +Materialize only supports Debezium with Avro-encoded messages. Users ingesting change data from TiCDC (or other Debezium-compatible producers that emit JSON rather than Avro) cannot use `ENVELOPE DEBEZIUM`. The validation in `src/sql/src/plan/statement/ddl.rs` explicitly requires `VALUE FORMAT AVRO` for Debezium envelope, blocking JSON-based CDC pipelines. + +### Solution + +Lift the Avro-only restriction on `ENVELOPE DEBEZIUM` to also accept `VALUE FORMAT JSON`. Add a JSON-based Debezium envelope decoder that extracts `before`/`after`/`op` fields from the Debezium JSON payload and maps operations to upsert semantics. Support both generic Debezium JSON and TiCDC's dialect via an extensible `MODE` parameter. + +### Scope + +**In Scope:** +- `VALUE FORMAT JSON, ENVELOPE DEBEZIUM` with `KEY FORMAT JSON` on Kafka sources +- JSON Debezium envelope parsing (extract `before`/`after`/`op` from `payload`) +- TiCDC dialect via `MODE` parameter (e.g., `ENVELOPE DEBEZIUM (MODE = 'TICDC')`) +- Insert (`c`), update (`u`), delete (`d`) operation handling +- Single `jsonb` output column for the "after" payload +- Support both `payload`-wrapped and flat envelope formats +- Error handling for malformed Debezium JSON messages + +**Out of Scope:** +- Schema-aware column projection from JSON payloads +- Keyless / upsert-from-payload sources +- `commit_ts`-based ordering from TiCDC source metadata +- Spatial type handling for TiCDC + +## Context for Development + +### Codebase Patterns + +**Avro Debezium Pipeline (existing):** +1. Avro decoder produces a Row with typed columns: `before` (Record, nullable), `after` (Record), `op` (String), `source` (Record) +2. `typecheck_debezium(&value_desc)` at planning time finds the column index of `after` in the RelationDesc → `after_idx` +3. Planning creates `UpsertStyle::Debezium { after_idx }` → wrapped in `UpsertEnvelope` +4. At render time, `upsert_commands()` in `sources.rs:549-560` does `row.iter().nth(after_idx)` to get `Datum::List(after)`, unpacks the record fields, appends metadata + +**JSON Decode Pipeline (existing, no Debezium):** +1. Raw bytes → `PreDelimitedFormat::Json` → `Jsonb::from_slice(bytes)` → `Row` with single `data: Jsonb` column +2. No envelope extraction — the entire JSON blob becomes the `data` column + +**Key Architectural Insight:** +For JSON + Debezium, the JSON decoder still produces a single Jsonb datum containing the ENTIRE Debezium envelope. The `typecheck_debezium()` path won't work because there's no "after" column in the RelationDesc — just a `data: Jsonb` column. Envelope extraction must happen in `upsert_commands()` via the new `UpsertStyle::DebeziumJson` variant, which parses the Jsonb value to extract `op` and `after` at render time. + +**Envelope Extraction Location:** `src/storage/src/render/sources.rs` in `upsert_commands()` — same function where Avro Debezium extraction happens. New match arm for `UpsertStyle::DebeziumJson`: +1. Take the single Jsonb datum from the decoded row +2. Parse JSON to navigate to `payload.after` (or `after` for flat format) and `payload.op` (or `op`) +3. For op `d` → return `None` (delete retraction) +4. For op `c`/`u` → re-serialize `after` value as Jsonb datum, return as the row + +**Error Handling Pattern:** Existing upsert sources surface decode errors through the source's error collection (see `DecodeResult` with `value: Option>`). Malformed Debezium JSON should produce `Err(DecodeError)` values through the same mechanism. + +### Files to Reference + +| File | Purpose | +| ---- | ------- | +| `src/sql-parser/src/ast/defs/ddl.rs:636-644` | `SourceEnvelope` AST enum — `Debezium` variant needs MODE parameter | +| `src/sql-parser/src/parser.rs:2362-2398` | `parse_source_envelope()` — parse MODE option after DEBEZIUM keyword | +| `src/sql/src/plan/statement/ddl.rs:1394-1412` | Debezium planning — lift Avro-only gate, handle JSON path | +| `src/sql/src/plan/statement/ddl.rs:2212-2229` | `typecheck_debezium()` — Avro-specific, bypass for JSON | +| `src/sql/src/plan/statement/ddl.rs:2247-2255` | KEY FORMAT requirement validation — already covers Debezium | +| `src/storage-types/src/sources/envelope.rs:70-84` | `UpsertStyle` enum — add `DebeziumJson` variant | +| `src/storage-types/src/sources/envelope.rs:230-255` | `UnplannedSourceEnvelope` desc transform — add JSON path | +| `src/storage-types/src/sources/encoding.rs:92-100` | `DataEncoding` enum — `Json` variant already exists | +| `src/storage/src/decode.rs:208-227` | `PreDelimitedFormat::Json` — no changes needed, raw JSON decode stays as-is | +| `src/storage/src/render/sources.rs:549-560` | `upsert_commands()` Debezium match arm — add DebeziumJson arm | +| `src/interchange/src/envelopes.rs:141-165` | `dbz_envelope()` / `dbz_format()` — reference for envelope structure | +| `test/testdrive/kafka-upsert-debezium-sources.td` | Existing Avro Debezium test — pattern to follow for JSON tests | + +### Technical Decisions + +**DD-1: Single jsonb output column** +- **Decision:** The decoded "after" payload is emitted as a single `jsonb` column (consistent with how `VALUE FORMAT JSON` works today without Debezium). +- **Alternative considered:** Schema-aware column projection where users define typed columns in the CREATE SOURCE statement. This would require JSON-to-datum type coercion and schema validation. Deferred as a future enhancement. +- **Note:** This pushes schema validation entirely to query time. Users will use `data->>'field_name'` for column access with no type checking at ingestion. Acceptable for V1. + +**DD-2: Require KEY FORMAT JSON** +- **Decision:** `KEY FORMAT JSON` is required for Debezium JSON sources, consistent with how Debezium Avro requires a key schema. +- **Open question:** Whether keyless sources with upsert-from-payload should be supported. To be revisited. + +**DD-3: Kafka offset ordering (not commit_ts)** +- **Decision:** Use Kafka offset ordering for message sequencing, same as existing Debezium Avro sources. +- **Open question:** Whether TiCDC's `commit_ts` should be used for ordering to better reflect TiDB's distributed transaction semantics. To be revisited. + +**DD-4: Extensible MODE parameter for dialect selection** +- **Decision:** Support both generic Debezium JSON and TiCDC's flavor via an extensible `MODE` parameter on the `ENVELOPE DEBEZIUM` clause. Default mode is generic Debezium. `MODE = 'TICDC'` selects TiCDC-specific handling (e.g., base64 binary encoding, float64 decimals). +- **Rationale (Party Mode):** A `MODE` parameter is more extensible than a boolean toggle. Future dialects (Maxwell, Canal, etc.) can be added without breaking syntax. + +**DD-5: New UpsertStyle::DebeziumJson variant** +- **Decision:** Introduce a new `UpsertStyle::DebeziumJson` variant rather than reusing the existing `UpsertStyle::Debezium { after_idx }`. The Avro variant's `after_idx` is schema-index-based and not applicable to JSON, which extracts `payload.after` by key name from the Jsonb datum at render time. +- **Rationale (Party Mode):** Keeps Avro and JSON code paths cleanly separated, avoids overloading the existing variant with conditional logic. + +**DD-6: Support both payload-wrapped and flat envelope formats** +- **Decision:** The decoder handles both `{ "payload": { "before": ..., "after": ..., "op": ... } }` (standard Debezium with schema) and flat `{ "before": ..., "after": ..., "op": ... }` (Debezium with `debezium-disable-schema=true` or similar configurations). +- **Rationale (Party Mode):** Some Debezium configurations strip the `payload` wrapper. Supporting both avoids a common production gotcha. + +**DD-7: Error handling for malformed messages** +- **Decision:** Malformed Debezium JSON messages (missing `op`, unexpected `op` values, null `after` on insert, unparseable JSON) should be handled consistently with Materialize's existing error policy for upsert sources — surface errors through the source's error collection (`DecodeError`) rather than silently dropping records. +- **Rationale (Party Mode):** JSON is untyped, so malformed messages are a high-risk area. Explicit error handling must be in scope, not deferred. + +**DD-8: Envelope extraction in upsert_commands (same location as Avro)** +- **Decision:** JSON Debezium envelope extraction happens in `upsert_commands()` in `src/storage/src/render/sources.rs`, the same function where Avro Debezium extraction occurs. The new `UpsertStyle::DebeziumJson` match arm parses the Jsonb datum to extract `op` and `after`, rather than using schema-based column indexing. +- **Rationale:** Keeps all envelope extraction logic co-located. The JSON path parses at render time (slightly different from Avro's index-based approach) but the upsert semantics (key + value → insert/update/delete) remain identical. + +**DD-9: Include key as separate output column (resolves key_indices correctness) [F1, F3, F5, F6]** +- **Decision:** Unlike Avro Debezium (which embeds key fields inside the unpacked `after` Record), JSON Debezium includes the key as a separate `key: Jsonb` column prepended to the output row. The output schema is `[key: Jsonb, data: Jsonb, ...metadata]`. `key_indices` points to position `[0]` (the key column). This is necessary because: (a) `upsert_core` uses `UpsertKey::from_value(value_ref, &key_indices)` to reconstruct keys from persisted values during rehydration — empty/wrong `key_indices` would cause data corruption after restart; (b) the `data: Jsonb` column is opaque so `match_key_indices` cannot find named key fields within it; (c) users need access to key fields since they can't be extracted from the opaque `after` payload without knowing the schema. +- **Consequence:** `INCLUDE KEY` rejection logic (which fires for Avro Debezium because "Debezium values include all keys") must NOT fire for JSON Debezium — the key is always included as a separate column. Update the validation at `ddl.rs:1372-1384` accordingly. +- **To revisit:** Whether the key column should be named `key` (single Jsonb blob) or flattened into multiple columns if the key JSON has multiple top-level fields. + +**DD-10: Parse-extract-reserialize performance cost [F12]** +- **Decision:** Every JSON Debezium message is parsed from `Jsonb` → `serde_json::Value`, the `after` field is extracted, and then re-serialized back to `Jsonb`. This is inherent to the approach of extracting envelope fields from an opaque JSON blob at render time. +- **Risk:** Additional CPU cost per message compared to Avro's index-based extraction. For V1 this is acceptable — JSON sources are already doing full JSON parsing in the decode layer. +- **Mitigation:** Profile under realistic throughput before GA. A future optimization could parse the raw bytes directly in the decode layer (before Jsonb conversion) to avoid the double-parse, but this would move envelope extraction out of `upsert_commands()` and diverge from DD-8. +- **To revisit:** Establish throughput benchmarks before shipping to production. + +**DD-11: No ingestion-time schema validation for JSON [F12]** +- **Decision:** Unlike Avro Debezium (which validates `before`/`after` column types at planning time via `typecheck_debezium()`), JSON Debezium performs no structural validation at planning time. The value is an opaque `Jsonb` blob. Malformed `after` payloads (e.g., unexpected nested structures, wrong types) are not caught until query time when users access fields via `->>`/`->>` operators. +- **Risk:** Users may not discover data quality issues until downstream queries fail or return unexpected results. +- **Mitigation:** Envelope-level validation (missing `op`, null `after` on insert) is caught at render time per DD-7. Payload-content validation is deferred to future schema-aware column projection work (out of scope). + +**DD-12: Handle `"r"` (read/snapshot) operation as insert [F2]** +- **Decision:** Debezium emits `op: "r"` for snapshot records (initial table scan). TiCDC and standard Debezium both produce these during initial sync. Treat `"r"` identically to `"c"` (create) — extract the `after` field and upsert the row. +- **Rationale:** Rejecting `"r"` would make any source unusable on tables that weren't empty when snapshotting began. +- **To revisit:** Whether snapshot-specific metadata should be exposed or handled differently. + +**DD-13: Silently skip `"t"` (truncate) operations [F8]** +- **Decision:** Debezium can emit `op: "t"` for truncate events. These are silently skipped (not errored) because Materialize has no mechanism to bulk-retract all rows for a key range from within the upsert pipeline. A warning-level log message is emitted when a truncate event is encountered. +- **To revisit:** Whether truncate support should be added in a future iteration (e.g., by retracting all known keys for the source). + +**DD-14: Null Kafka message values treated as deletes [F9]** +- **Decision:** On Kafka compacted topics, null-value records serve as tombstones. When the entire Kafka message value is null (not a JSON object with `op: "d"`, but literally absent), treat it as a delete for the given key. This is consistent with how `ENVELOPE UPSERT` handles null values in the existing decode pipeline (`result.value: None` at the `upsert_commands` level already maps to retraction). +- **Rationale:** This requires no new code — the existing `result.value: None` path in `upsert_commands` already handles this before the `UpsertStyle` match arm is reached. + +**DD-15: TiCDC MODE is structural-only in V1 [F10]** +- **Decision:** The `MODE = 'TICDC'` parameter is parsed, stored, and round-trips through SQL display, but V1 runtime behavior is identical to generic mode. This establishes the syntax and plumbing for future TiCDC-specific handling (base64 binary, float64 decimals) without shipping dead behavioral code. +- **Rationale:** The parsing/storage cost is minimal and avoids a breaking syntax change later. The test for TiCDC mode (Task 8) verifies the plumbing works. +- **To revisit:** Define concrete V1 behavioral differences or defer MODE entirely if the plumbing cost is deemed not worth it during implementation. + +**DD-16: Multi-partition ordering is a known constraint [F11]** +- **Decision:** For multi-partition Kafka topics, updates for the same key on different partitions can arrive out of order. This is the same limitation as existing Avro Debezium sources and is inherent to Kafka's per-partition ordering guarantee. TiCDC distributes events across partitions, so this is particularly relevant for the TiCDC use case. +- **Known constraint:** Users must ensure same-key events are routed to the same partition (standard Kafka key-based partitioning) for correct ordering. +- **To revisit:** Whether `commit_ts`-based ordering (DD-3) could resolve cross-partition ordering for TiCDC. + +**DD-17: Pseudocode is illustrative, not compilable [F7, F13]** +- **Decision:** All pseudocode in this spec (particularly `extract_debezium_json` and the `upsert_commands` match arm) is illustrative of the intended logic, not compilable Rust. Implementation must handle Rust-specific concerns: (a) `Datum::Jsonb` wraps a borrowed `JsonbRef<'a>` — extracting and re-packing requires allocating into a `Row` via `RowPacker`; (b) the nested match structure in Task 5 should be simplified during implementation; (c) error types must use the concrete `DecodeErrorKind` variants available in the codebase. +- **Rationale:** Spec pseudocode communicates intent. The implementing developer is expected to adapt to Rust's ownership/lifetime model. + +## Implementation Plan + +### Tasks + +- [ ] **Task 1: Add MODE parameter to SourceEnvelope AST** + - File: `src/sql-parser/src/ast/defs/ddl.rs` + - Action: Extend `SourceEnvelope::Debezium` from a unit variant to a struct variant with an optional `mode` field. Define a `DebeziumMode` enum with variants `None` (generic) and `TiCdc`. + ```rust + pub enum DebeziumMode { None, TiCdc } + pub enum SourceEnvelope { + // ... + Debezium { mode: DebeziumMode }, + // ... + } + ``` + - Action: Update the `AstDisplay` impl for `SourceEnvelope::Debezium` to render `DEBEZIUM` or `DEBEZIUM (MODE = 'TICDC')`. + - Action: Update `requires_all_input()` match arm accordingly. + - Notes: All existing references to `SourceEnvelope::Debezium` across the codebase will need pattern updates (e.g., `SourceEnvelope::Debezium { .. }`). Search for all match arms. + +- [ ] **Task 2: Parse MODE option in parse_source_envelope** + - File: `src/sql-parser/src/parser.rs` + - Action: In `parse_source_envelope()` (line ~2365), after matching the `DEBEZIUM` keyword, optionally parse `(MODE = '')`. If present, map `'TICDC'` to `DebeziumMode::TiCdc`. If absent, default to `DebeziumMode::None`. + ```rust + } else if self.parse_keyword(DEBEZIUM) { + let mode = if self.consume_token(&Token::LParen) { + self.expect_keyword(MODE)?; + self.expect_token(&Token::Eq)?; + let mode_str = self.parse_literal_string()?; + self.expect_token(&Token::RParen)?; + match mode_str.to_uppercase().as_str() { + "TICDC" => DebeziumMode::TiCdc, + other => return Err(/* unsupported mode error */), + } + } else { + DebeziumMode::None + }; + SourceEnvelope::Debezium { mode } + } + ``` + - Notes: The `MODE` keyword may need to be added to the keyword list if not already present. + +- [ ] **Task 3: Add UpsertStyle::DebeziumJson variant** + - File: `src/storage-types/src/sources/envelope.rs` + - Action: Add a new variant to `UpsertStyle`: + ```rust + pub enum UpsertStyle { + Default(KeyEnvelope), + Debezium { after_idx: usize }, + DebeziumJson { mode: DebeziumJsonMode }, + ValueErrInline { key_envelope: KeyEnvelope, error_column: String }, + } + ``` + - Action: Define `DebeziumJsonMode` enum (or reuse from AST layer via a storage-types equivalent): + ```rust + pub enum DebeziumJsonMode { Generic, TiCdc } + ``` + - Notes: Must derive `Clone, Debug, Serialize, Deserialize, Eq, PartialEq` to match existing variants. + +- [ ] **Task 4: Add UnplannedSourceEnvelope descriptor transform for DebeziumJson** + - File: `src/storage-types/src/sources/envelope.rs` + - Action: In the `UnplannedSourceEnvelope` match that builds the output `RelationDesc` (line ~230), add a new arm for `UpsertStyle::DebeziumJson`. Per DD-9, the output schema is `[key: Jsonb, data: Jsonb, ...metadata]`: + ```rust + UnplannedSourceEnvelope::Upsert { + style: UpsertStyle::DebeziumJson { .. }, + } => { + // Key is included as a separate column (unlike Avro Debezium where keys are in the after record) + let mut desc = RelationDesc::builder() + .with_column("key", ScalarType::Jsonb.nullable(false)) + .with_column("data", ScalarType::Jsonb.nullable(false)) + .finish(); + // key_indices = [0] — points to the key column for UpsertKey::from_value rehydration + let key_indices = vec![0usize]; + desc = desc.with_key(key_indices.clone()); + let desc = desc.concat(metadata_desc); + ( + self.into_source_envelope(Some(key_indices), None, Some(desc.arity())), + desc, + ) + } + ``` + - Notes: This approach ensures `upsert_core` can reconstruct keys from persisted values during rehydration via `UpsertKey::from_value(value_ref, &key_indices)`. The key column at index 0 contains the JSON key blob. Skip `match_key_indices` entirely for this path. + +- [ ] **Task 5: Lift Avro-only gate in DDL planning** + - File: `src/sql/src/plan/statement/ddl.rs` + - Action: Modify the `ast::SourceEnvelope::Debezium` match arm (lines 1394-1412) to handle JSON encoding separately: + ```rust + ast::SourceEnvelope::Debezium { mode } => { + match encoding.as_ref().map(|e| &e.value) { + Some(DataEncoding::Json) => { + // JSON path: skip typecheck_debezium, use DebeziumJson style + let storage_mode = match mode { + DebeziumMode::TiCdc => DebeziumJsonMode::TiCdc, + DebeziumMode::None => DebeziumJsonMode::Generic, + }; + UnplannedSourceEnvelope::Upsert { + style: UpsertStyle::DebeziumJson { mode: storage_mode }, + } + } + _ => { + // Existing Avro path (unchanged) + let after_idx = match typecheck_debezium(&value_desc) { + Ok((_before_idx, after_idx)) => Ok(after_idx), + Err(type_err) => match encoding.as_ref().map(|e| &e.value) { + Some(DataEncoding::Avro(_)) => Err(type_err), + _ => Err(sql_err!( + "ENVELOPE DEBEZIUM requires that VALUE FORMAT is set to AVRO or JSON" + )), + }, + }?; + UnplannedSourceEnvelope::Upsert { + style: UpsertStyle::Debezium { after_idx }, + } + } + } + } + ``` + - Action: Update all other match arms referencing `ast::SourceEnvelope::Debezium` to use `ast::SourceEnvelope::Debezium { .. }` pattern (lines 810, 1372-1384, and any others found by the compiler). + - Action: Per DD-9, update `INCLUDE KEY` rejection logic at `ddl.rs:1372-1384` — the "Cannot use INCLUDE KEY with ENVELOPE DEBEZIUM" error must NOT fire when `DataEncoding::Json` is used. For JSON Debezium, key is always included as a separate column. + - Notes: The error message changes from "AVRO" to "AVRO or JSON". KEY FORMAT validation at line 2247 already covers Debezium and requires no changes. + +- [ ] **Task 6: Implement DebeziumJson extraction in upsert_commands** + - File: `src/storage/src/render/sources.rs` + - Action: Add a new match arm in `upsert_commands()` alongside the existing `UpsertStyle::Debezium { after_idx }` arm (line ~549): + ```rust + UpsertStyle::DebeziumJson { mode } => { + // The row contains a single Jsonb datum with the full Debezium envelope + let jsonb_datum = row.iter().next().unwrap(); + match extract_debezium_json(jsonb_datum, mode) { + Ok(Some(after_jsonb_row)) => { + // Per DD-9: prepend key, then after payload, then metadata + let mut packer = row_buf.packer(); + packer.extend(key_row.iter()); // key at position 0 + packer.extend(after_jsonb_row.iter()); // data at position 1 + packer.extend_by_row(&metadata); + Some(Ok(row_buf.clone())) + } + Ok(None) => None, // Delete operation + Err(e) => Some(Err(DecodeError { kind: e, raw: vec![] })), + } + } + ``` + - Action: Implement `extract_debezium_json()` helper function (in same file or in `src/interchange/src/json.rs`). Per DD-12/DD-13/DD-17, pseudocode is illustrative: + ```rust + fn extract_debezium_json( + datum: Datum<'_>, + mode: &DebeziumJsonMode, + ) -> Result, DecodeErrorKind> { + // 1. Convert Jsonb datum to serde_json::Value (requires Row allocation per DD-17) + // 2. Check for "payload" wrapper; if present, unwrap + // 3. Extract "op" field: + // - "c" or "r" (per DD-12: snapshot reads treated as inserts) → extract after + // - "u" → extract after + // - "d" → return Ok(None) + // - "t" → log warning, return Ok(None) per DD-13 + // - missing/other → return Err(DecodeErrorKind) + // 4. Extract "after" field → must be non-null for c/r/u, else Err + // 5. Serialize "after" back into a Row containing a single Jsonb datum + // 6. Return Ok(Some(row)) + } + ``` + - Action: Also update the key_row match arm (line ~517) to include `UpsertStyle::DebeziumJson { .. }` alongside `UpsertStyle::Debezium { .. }` for key handling. + - Notes: Per DD-17, `Datum::Jsonb` wraps a borrowed `JsonbRef<'a>` — extracting and re-packing requires allocating into a new `Row` via `RowPacker`. Per DD-14, null Kafka values are already handled as deletes before this match arm is reached. + +- [ ] **Task 7: Create testdrive tests for Debezium JSON sources** + - File: `test/testdrive/kafka-debezium-json-sources.td` (new file) + - Action: Create comprehensive testdrive test covering: + - Source creation with `KEY FORMAT JSON, VALUE FORMAT JSON, ENVELOPE DEBEZIUM` + - Insert operation (op="c"): publish JSON with `after` set, `before` null → verify row appears + - Update operation (op="u"): publish JSON with both `before` and `after` → verify row updated + - Delete operation (op="d"): publish JSON with `before` set, `after` null → verify row retracted + - Payload-wrapped format: `{ "payload": { "op": "c", "after": {...} } }` + - Flat format: `{ "op": "c", "after": {...} }` + - Compound key: multiple fields in key JSON + - Error case: malformed message (missing `op` field) + - Notes: Use `kafka-ingest format=bytes` for raw JSON messages (not Avro). Follow patterns from `kafka-upsert-debezium-sources.td` for structure. Key and value are separate JSON blobs sent as raw bytes. + +- [ ] **Task 8: Create testdrive tests for TiCDC mode** + - File: `test/testdrive/kafka-debezium-json-ticdc-sources.td` (new file) + - Action: Create tests specifically for TiCDC dialect: + - Source creation with `ENVELOPE DEBEZIUM (MODE = 'TICDC')` + - TiCDC-formatted messages with `commit_ts` and `cluster_id` in source metadata + - Insert/Update/Delete operations with TiCDC envelope structure + - Notes: TiCDC messages have the same `payload.before/after/op` structure as generic Debezium. The MODE parameter is parsed and stored but V1 behavior differences are minimal — this test establishes the pattern for future TiCDC-specific handling. + +### Acceptance Criteria + +**Happy Path:** +- [ ] AC-1: Given a Kafka source with `KEY FORMAT JSON, VALUE FORMAT JSON, ENVELOPE DEBEZIUM`, when a JSON message with `"op": "c"` and a non-null `"after"` field is published, then a row appears in the source with `key` column containing the JSON key and `data` column containing the `after` JSON object. +- [ ] AC-2: Given an existing row from AC-1, when a JSON message with `"op": "u"`, matching key, and updated `"after"` is published, then the row's `data` column reflects the new `after` value. +- [ ] AC-3: Given an existing row from AC-1, when a JSON message with `"op": "d"` and null `"after"` is published, then the row is retracted from the source. +- [ ] AC-4: Given a Debezium JSON message wrapped in a `"payload"` object (e.g., `{ "payload": { "op": "c", "after": {...} } }`), when ingested, then envelope extraction succeeds identically to flat format. +- [ ] AC-5: Given a Debezium JSON message in flat format (e.g., `{ "op": "c", "after": {...} }`), when ingested, then envelope extraction succeeds. +- [ ] AC-6: Given a Debezium JSON message with `"op": "r"` (snapshot/read), when ingested, then it is treated as an insert (same as `"c"`). [DD-12] + +**TiCDC Mode:** +- [ ] AC-7: Given `ENVELOPE DEBEZIUM (MODE = 'TICDC')` in the CREATE SOURCE statement, when a TiCDC-formatted JSON message is published, then the source processes it correctly (same behavior as generic mode for V1, with `commit_ts`/`cluster_id` in source metadata ignored). + +**Error Handling:** +- [ ] AC-8: Given a JSON message with a missing `"op"` field, when ingested, then a decode error is surfaced through the source's error collection (not silently dropped). +- [ ] AC-9: Given a JSON message with an unrecognized `"op"` value (e.g., `"op": "x"`), when ingested, then a decode error is surfaced. +- [ ] AC-10: Given a JSON message with `"op": "c"` but null `"after"`, when ingested, then a decode error is surfaced. +- [ ] AC-11: Given a Debezium JSON message with `"op": "t"` (truncate), when ingested, then it is silently skipped (not errored). [DD-13] +- [ ] AC-12: Given a null Kafka message value (tombstone), when ingested, then it is treated as a delete for the given key. [DD-14] + +**SQL Validation:** +- [ ] AC-13: Given `VALUE FORMAT JSON, ENVELOPE DEBEZIUM` without `KEY FORMAT`, when CREATE SOURCE is executed, then it fails with an error requiring KEY FORMAT. +- [ ] AC-14: Given `VALUE FORMAT TEXT, ENVELOPE DEBEZIUM`, when CREATE SOURCE is executed, then it fails with an error requiring AVRO or JSON format. +- [ ] AC-15: Given `VALUE FORMAT JSON, ENVELOPE DEBEZIUM` with `KEY FORMAT JSON`, when CREATE SOURCE is executed, then it succeeds. + +**Restart/Rehydration [F4]:** +- [ ] AC-16: Given a running Debezium JSON source with ingested data, when the source is restarted (cluster restart or rehydration from persist), then all previously ingested rows are correctly restored with matching key and data values. + +**Regression:** +- [ ] AC-17: Given existing `VALUE FORMAT AVRO, ENVELOPE DEBEZIUM` sources, when the same Avro Debezium tests are run, then behavior is unchanged (no regressions). + +## Additional Context + +### Dependencies + +- `serde_json` — already in the dependency tree, needed for parsing Debezium JSON envelope fields in `upsert_commands()` +- `mz_repr::adt::jsonb` — existing Jsonb type used for the output datum +- No new external crate dependencies required + +### Testing Strategy + +**Testdrive (integration):** +- `test/testdrive/kafka-debezium-json-sources.td` — primary test file covering generic Debezium JSON: insert, update, delete, payload-wrapped, flat, compound key, error cases +- `test/testdrive/kafka-debezium-json-ticdc-sources.td` — TiCDC dialect mode tests + +**Regression:** +- Run existing `test/testdrive/kafka-upsert-debezium-sources.td` and related Avro Debezium tests to ensure no breakage from the `SourceEnvelope::Debezium` AST change + +**Unit tests (if applicable):** +- `extract_debezium_json()` helper function: test payload-wrapped vs flat, all op types, malformed inputs +- Can be added inline in the implementing module with `#[cfg(test)]` block + +**Manual verification:** +- Create a Kafka topic, publish raw JSON Debezium messages via `kcat` or similar, create source in Materialize, verify `SELECT * FROM source_table` returns expected results + +### Notes + +- All identified risks are captured as design decisions DD-9 through DD-17. +- TiCDC MODE is parsed and stored but V1 behavior is identical to generic mode (DD-15). +- Multi-partition ordering is a known constraint documented in DD-16. + +**Future considerations (out of scope):** +- Schema-aware column projection (`CREATE SOURCE ... (id int, name text) FORMAT JSON ENVELOPE DEBEZIUM`) +- Keyless upsert where key is derived from the Debezium payload +- `commit_ts`-based ordering for TiCDC +- Additional MODE values: Maxwell, Canal, etc. +- Truncate operation support (DD-13) + +--- + +## Party Mode Changelog + +Changes made during Party Mode review (Step 1): + +| # | Change | Source | Section | +|---|--------|--------|---------| +| 1 | Added `MODE` parameter syntax instead of boolean toggle for dialect selection | Winston (Architect) | DD-4, Scope | +| 2 | Added `UpsertStyle::DebeziumJson` as new variant (not reusing Avro's `after_idx`) | Amelia (Developer) | DD-5 (new) | +| 3 | Added support for both `payload`-wrapped and flat envelope formats | Winston (Architect) | DD-6 (new), Scope | +| 4 | Added error handling for malformed messages as in-scope requirement | John (PM) + Murat (Test Architect) | DD-7 (new), Scope | +| 5 | Added note about schema validation being deferred to query time | Winston (Architect) | DD-1 | +| 6 | Refined solution description to reference extensible MODE parameter | All | Solution | + +## Adversarial Review Changelog + +Findings from adversarial review, resolved as design decisions: + +| Finding | Severity | Resolution | Design Decision | +|---------|----------|------------|-----------------| +| F1: key_indices correctness unresolved | Critical | Include key as separate output column; key_indices=[0] | DD-9 | +| F2: `"r"` (snapshot) op type ignored | Critical | Treat "r" as insert, same as "c" | DD-12 | +| F3: INCLUDE KEY broken for JSON Debezium | Critical | Key always included as separate column; skip INCLUDE KEY rejection | DD-9 | +| F4: No restart/rehydration AC | High | Added AC-16 for restart correctness | AC-16 | +| F5: Task 4 pseudocode self-contradictory | High | Rewrote Task 4 with correct key_indices approach | DD-9, Task 4 | +| F6: DebeziumJson arm doesn't prepend key | High | Updated Task 6 pseudocode to prepend key_row | DD-9, Task 6 | +| F7: extract_debezium_json lifetime unclear | High | Noted pseudocode is illustrative; impl must handle Row allocation | DD-17 | +| F8: `"t"` (truncate) op not addressed | Medium | Silently skip with warning log | DD-13, AC-11 | +| F9: Null Kafka values (tombstones) not discussed | Medium | Treated as deletes; existing pipeline handles this | DD-14, AC-12 | +| F10: TiCDC MODE does nothing in V1 | Medium | Acknowledged as structural-only; establishes syntax for future | DD-15 | +| F11: Multi-partition ordering not discussed | Medium | Documented as known constraint | DD-16 | +| F12: No performance ACs | Low | Added "profile before GA" to DD-10 mitigation | DD-10 | +| F13: Nested match structure confusing | Low | Noted in DD-17; simplify during implementation | DD-17 | From 55d64c24552183f83824b79e22fea7b25af47a1d Mon Sep 17 00:00:00 2001 From: Jon Currey Date: Thu, 5 Mar 2026 23:53:39 -0500 Subject: [PATCH 3/5] storage: Add envelope column to preserve Debezium CDC metadata The `envelope` jsonb column contains the full Debezium envelope, making CDC metadata like `commit_ts`, `ts_ms`, and `source` fields accessible for user queries. This is critical for TiCDC users who need `commit_ts` to correlate CDC messages with Dumpling snapshots. Output schema is now [key: jsonb, data: jsonb, envelope: jsonb, ...metadata]. Co-Authored-By: Claude Opus 4.6 --- ...20260305_debezium_json_envelope_sources.md | 10 +++---- src/storage-types/src/sources/envelope.rs | 6 +++- src/storage/src/render/sources.rs | 28 +++++++++++++++---- test/testdrive/kafka-debezium-json-sources.td | 6 +++- .../kafka-debezium-json-ticdc-sources.td | 12 ++++++++ 5 files changed, 50 insertions(+), 12 deletions(-) diff --git a/doc/developer/spec/20260305_debezium_json_envelope_sources.md b/doc/developer/spec/20260305_debezium_json_envelope_sources.md index 76d74fda85650..10099b55d2165 100644 --- a/doc/developer/spec/20260305_debezium_json_envelope_sources.md +++ b/doc/developer/spec/20260305_debezium_json_envelope_sources.md @@ -47,14 +47,14 @@ Lift the Avro-only restriction on `ENVELOPE DEBEZIUM` to also accept `VALUE FORM - JSON Debezium envelope parsing (extract `before`/`after`/`op` from `payload`) - TiCDC dialect via `MODE` parameter (e.g., `ENVELOPE DEBEZIUM (MODE = 'TICDC')`) - Insert (`c`), update (`u`), delete (`d`) operation handling -- Single `jsonb` output column for the "after" payload +- Output columns: `data` (jsonb, the "after" payload) and `envelope` (jsonb, the full Debezium envelope for CDC metadata access) - Support both `payload`-wrapped and flat envelope formats - Error handling for malformed Debezium JSON messages **Out of Scope:** - Schema-aware column projection from JSON payloads - Keyless / upsert-from-payload sources -- `commit_ts`-based ordering from TiCDC source metadata +- `commit_ts`-based ordering from TiCDC source metadata (note: `commit_ts` is preserved in the `envelope` column for user queries, but not used for ordering) - Spatial type handling for TiCDC ## Context for Development @@ -237,7 +237,7 @@ For JSON + Debezium, the JSON decoder still produces a single Jsonb datum contai - [ ] **Task 4: Add UnplannedSourceEnvelope descriptor transform for DebeziumJson** - File: `src/storage-types/src/sources/envelope.rs` - - Action: In the `UnplannedSourceEnvelope` match that builds the output `RelationDesc` (line ~230), add a new arm for `UpsertStyle::DebeziumJson`. Per DD-9, the output schema is `[key: Jsonb, data: Jsonb, ...metadata]`: + - Action: In the `UnplannedSourceEnvelope` match that builds the output `RelationDesc` (line ~230), add a new arm for `UpsertStyle::DebeziumJson`. Per DD-9, the output schema is `[key: Jsonb, data: Jsonb, envelope: Jsonb, ...metadata]`: ```rust UnplannedSourceEnvelope::Upsert { style: UpsertStyle::DebeziumJson { .. }, @@ -372,7 +372,7 @@ For JSON + Debezium, the JSON decoder still produces a single Jsonb datum contai - [ ] AC-6: Given a Debezium JSON message with `"op": "r"` (snapshot/read), when ingested, then it is treated as an insert (same as `"c"`). [DD-12] **TiCDC Mode:** -- [ ] AC-7: Given `ENVELOPE DEBEZIUM (MODE = 'TICDC')` in the CREATE SOURCE statement, when a TiCDC-formatted JSON message is published, then the source processes it correctly (same behavior as generic mode for V1, with `commit_ts`/`cluster_id` in source metadata ignored). +- [ ] AC-7: Given `ENVELOPE DEBEZIUM (MODE = 'TICDC')` in the CREATE SOURCE statement, when a TiCDC-formatted JSON message is published, then the source processes it correctly. The full Debezium envelope (including `commit_ts`, `cluster_id`, and other TiCDC metadata) is preserved in the `envelope` column for user queries (e.g., `envelope->'source'->>'commit_ts'`). **Error Handling:** - [ ] AC-8: Given a JSON message with a missing `"op"` field, when ingested, then a decode error is surfaced through the source's error collection (not silently dropped). @@ -425,7 +425,7 @@ For JSON + Debezium, the JSON decoder still produces a single Jsonb datum contai **Future considerations (out of scope):** - Schema-aware column projection (`CREATE SOURCE ... (id int, name text) FORMAT JSON ENVELOPE DEBEZIUM`) - Keyless upsert where key is derived from the Debezium payload -- `commit_ts`-based ordering for TiCDC +- `commit_ts`-based ordering for TiCDC (the field is already preserved in the `envelope` column; future work is using it for message ordering) - Additional MODE values: Maxwell, Canal, etc. - Truncate operation support (DD-13) diff --git a/src/storage-types/src/sources/envelope.rs b/src/storage-types/src/sources/envelope.rs index 66acf6b835ddb..ba07b605ab0e5 100644 --- a/src/storage-types/src/sources/envelope.rs +++ b/src/storage-types/src/sources/envelope.rs @@ -243,13 +243,17 @@ impl UnplannedSourceEnvelope { style: UpsertStyle::DebeziumJson { .. }, .. } => { - // JSON Debezium: output is [key: Jsonb, data: Jsonb, ...metadata] + // JSON Debezium: output is [key: Jsonb, data: Jsonb, envelope: Jsonb, ...metadata] // Key is included as a separate column (unlike Avro Debezium). + // `data` contains the `after` field (the row payload). + // `envelope` contains the full Debezium envelope for CDC metadata + // access (e.g., source.commit_ts for TiCDC correlation). let key_desc = RelationDesc::builder() .with_column("key", SqlScalarType::Jsonb.nullable(false)) .finish(); let value_desc = RelationDesc::builder() .with_column("data", SqlScalarType::Jsonb.nullable(false)) + .with_column("envelope", SqlScalarType::Jsonb.nullable(false)) .finish(); // key_indices = [0]: points to the key column for UpsertKey::from_value rehydration let key_indices = vec![0usize]; diff --git a/src/storage/src/render/sources.rs b/src/storage/src/render/sources.rs index 9b27eb93bd633..b9bc14e556625 100644 --- a/src/storage/src/render/sources.rs +++ b/src/storage/src/render/sources.rs @@ -554,11 +554,12 @@ fn upsert_commands( Some(Ok(ref row)) => match upsert_envelope.style { UpsertStyle::DebeziumJson { ref mode } => { match extract_debezium_json(row, mode) { - Ok(Some(after_row)) => { + Ok(Some((after_row, envelope_row))) => { let mut packer = row_buf.packer(); - // Per DD-9: prepend key, then after payload, then metadata + // key, data (after), envelope (full), then metadata packer.extend_by_row(&key_row); packer.extend_by_row(&after_row); + packer.extend_by_row(&envelope_row); packer.extend_by_row(&metadata); Some(Ok(row_buf.clone())) } @@ -637,11 +638,16 @@ fn upsert_commands( /// Handles both `payload`-wrapped (`{ "payload": { "op": ..., "after": ... } }`) /// and flat (`{ "op": ..., "after": ... }`) formats. /// +/// Extracts the `after` payload and full envelope from a Debezium JSON message. +/// /// Returns: -/// - `Ok(Some(row))` for insert/update/snapshot operations (op = c/u/r) +/// - `Ok(Some((after_row, envelope_row)))` for insert/update/snapshot operations (op = c/u/r) /// - `Ok(None)` for delete/truncate operations (op = d/t) /// - `Err(DecodeError)` for malformed messages -fn extract_debezium_json(row: &Row, _mode: &DebeziumJsonMode) -> Result, DecodeError> { +fn extract_debezium_json( + row: &Row, + _mode: &DebeziumJsonMode, +) -> Result, DecodeError> { use mz_repr::adt::jsonb::JsonbRef; let datum = row.iter().next().unwrap(); @@ -706,7 +712,19 @@ fn extract_debezium_json(row: &Row, _mode: &DebeziumJsonMode) -> Result Err(DecodeError { kind: DecodeErrorKind::Text( diff --git a/test/testdrive/kafka-debezium-json-sources.td b/test/testdrive/kafka-debezium-json-sources.td index 3e177bac3d4d0..28dd0f84c9f5f 100644 --- a/test/testdrive/kafka-debezium-json-sources.td +++ b/test/testdrive/kafka-debezium-json-sources.td @@ -39,12 +39,16 @@ $ kafka-ingest format=bytes key-format=bytes key-terminator=: topic=dbz-json ENVELOPE DEBEZIUM > COMMIT -# Output schema is [key: jsonb, data: jsonb] +# Output schema is [key: jsonb, data: jsonb, envelope: jsonb] # The last update for key=1 should win (lizard) # Use ->> to extract fields since JSON key order is non-deterministic > SELECT key, data->>'id', data->>'creature' FROM dbz_json_tbl 1 1 lizard +# The envelope column contains the full Debezium envelope +> SELECT envelope->>'op' FROM dbz_json_tbl +u + # --- Insert a second row --- $ kafka-ingest format=bytes key-format=bytes key-terminator=: topic=dbz-json diff --git a/test/testdrive/kafka-debezium-json-ticdc-sources.td b/test/testdrive/kafka-debezium-json-ticdc-sources.td index 5c4d08396f1e3..1b5565b7799b6 100644 --- a/test/testdrive/kafka-debezium-json-ticdc-sources.td +++ b/test/testdrive/kafka-debezium-json-ticdc-sources.td @@ -46,6 +46,18 @@ $ kafka-ingest format=bytes key-format=bytes key-terminator=: topic=dbz-ticdc 1 alice_updated 2 bob +# --- Verify envelope column preserves CDC metadata --- +# The envelope column contains the full Debezium envelope including source metadata. +# This is critical for TiCDC users who need commit_ts to correlate with Dumpling snapshots. +> SELECT key, envelope->'source'->>'commit_ts' FROM dbz_ticdc_tbl ORDER BY key +1 1002 +2 1001 + +# ts_ms is also available +> SELECT key, envelope->>'ts_ms' FROM dbz_ticdc_tbl ORDER BY key +1 1609459200002 +2 1609459200001 + # --- Test delete --- $ kafka-ingest format=bytes key-format=bytes key-terminator=: topic=dbz-ticdc From 07de1d11a0cb29e5244e551a8202982cc5843113 Mon Sep 17 00:00:00 2001 From: Jon Currey Date: Fri, 6 Mar 2026 00:23:28 -0500 Subject: [PATCH 4/5] storage: Switch envelope column to opt-in INCLUDE DEBEZIUM METADATA Replace the always-on `envelope` column with an opt-in mechanism using `INCLUDE DEBEZIUM METADATA [AS alias]`, consistent with existing INCLUDE OFFSET/PARTITION patterns. Users who need CDC metadata (e.g., TiCDC commit_ts for Dumpling snapshot correlation) opt in explicitly: CREATE TABLE ... FROM SOURCE ... KEY FORMAT JSON VALUE FORMAT JSON INCLUDE DEBEZIUM METADATA AS dbz_meta ENVELOPE DEBEZIUM (MODE = 'TICDC') SELECT dbz_meta->'source'->>'commit_ts' FROM ... Default column name is `debezium_metadata` when no alias is given. Validation ensures INCLUDE DEBEZIUM METADATA is only used with ENVELOPE DEBEZIUM + VALUE FORMAT JSON. Co-Authored-By: Claude Opus 4.6 --- ...20260305_debezium_json_envelope_sources.md | 5 +- src/sql-parser/src/ast/defs/ddl.rs | 8 +++ src/sql-parser/src/parser.rs | 12 +++-- src/sql/src/plan/statement/ddl.rs | 40 ++++++++++++++- src/storage-types/src/sources/envelope.rs | 26 ++++++---- src/storage/src/render/sources.rs | 49 +++++++++++------- test/testdrive/kafka-debezium-json-sources.td | 51 +++++++++++++++++-- .../kafka-debezium-json-ticdc-sources.td | 9 ++-- 8 files changed, 158 insertions(+), 42 deletions(-) diff --git a/doc/developer/spec/20260305_debezium_json_envelope_sources.md b/doc/developer/spec/20260305_debezium_json_envelope_sources.md index 10099b55d2165..aef8ad4f745b2 100644 --- a/doc/developer/spec/20260305_debezium_json_envelope_sources.md +++ b/doc/developer/spec/20260305_debezium_json_envelope_sources.md @@ -47,14 +47,15 @@ Lift the Avro-only restriction on `ENVELOPE DEBEZIUM` to also accept `VALUE FORM - JSON Debezium envelope parsing (extract `before`/`after`/`op` from `payload`) - TiCDC dialect via `MODE` parameter (e.g., `ENVELOPE DEBEZIUM (MODE = 'TICDC')`) - Insert (`c`), update (`u`), delete (`d`) operation handling -- Output columns: `data` (jsonb, the "after" payload) and `envelope` (jsonb, the full Debezium envelope for CDC metadata access) +- Output columns: `key` (jsonb) and `data` (jsonb, the "after" payload) +- `INCLUDE DEBEZIUM METADATA [AS alias]` to expose the full Debezium envelope as an opt-in jsonb column for CDC metadata access (e.g., `commit_ts`, `ts_ms`, `source`) - Support both `payload`-wrapped and flat envelope formats - Error handling for malformed Debezium JSON messages **Out of Scope:** - Schema-aware column projection from JSON payloads - Keyless / upsert-from-payload sources -- `commit_ts`-based ordering from TiCDC source metadata (note: `commit_ts` is preserved in the `envelope` column for user queries, but not used for ordering) +- `commit_ts`-based ordering from TiCDC source metadata (note: `commit_ts` is preserved via `INCLUDE DEBEZIUM METADATA` for user queries, but not used for ordering) - Spatial type handling for TiCDC ## Context for Development diff --git a/src/sql-parser/src/ast/defs/ddl.rs b/src/sql-parser/src/ast/defs/ddl.rs index 1e8055a5096f1..b220000aeb802 100644 --- a/src/sql-parser/src/ast/defs/ddl.rs +++ b/src/sql-parser/src/ast/defs/ddl.rs @@ -560,6 +560,10 @@ pub enum SourceIncludeMetadata { alias: Ident, use_bytes: bool, }, + /// `INCLUDE DEBEZIUM METADATA` — full Debezium envelope as jsonb + DebeziumMetadata { + alias: Option, + }, } impl AstDisplay for SourceIncludeMetadata { @@ -605,6 +609,10 @@ impl AstDisplay for SourceIncludeMetadata { f.write_str(" BYTES"); } } + SourceIncludeMetadata::DebeziumMetadata { alias } => { + f.write_str("DEBEZIUM METADATA"); + print_alias(f, alias); + } } } } diff --git a/src/sql-parser/src/parser.rs b/src/sql-parser/src/parser.rs index 852d0ea91622d..0ff8ac0de6f31 100644 --- a/src/sql-parser/src/parser.rs +++ b/src/sql-parser/src/parser.rs @@ -4830,9 +4830,9 @@ impl<'a> Parser<'a> { fn parse_source_include_metadata(&mut self) -> Result, ParserError> { if self.parse_keyword(INCLUDE) { self.parse_comma_separated(|parser| { - let metadata = match parser - .expect_one_of_keywords(&[KEY, TIMESTAMP, PARTITION, OFFSET, HEADERS, HEADER])? - { + let metadata = match parser.expect_one_of_keywords(&[ + KEY, TIMESTAMP, PARTITION, OFFSET, HEADERS, HEADER, DEBEZIUM, + ])? { KEY => SourceIncludeMetadata::Key { alias: parser.parse_alias()?, }, @@ -4859,6 +4859,12 @@ impl<'a> Parser<'a> { use_bytes, } } + DEBEZIUM => { + parser.expect_keyword(METADATA)?; + SourceIncludeMetadata::DebeziumMetadata { + alias: parser.parse_alias()?, + } + } _ => unreachable!("only explicitly allowed items can be parsed"), }; Ok(metadata) diff --git a/src/sql/src/plan/statement/ddl.rs b/src/sql/src/plan/statement/ddl.rs index 516d607f45a42..0c6525cfc3a46 100644 --- a/src/sql/src/plan/statement/ddl.rs +++ b/src/sql/src/plan/statement/ddl.rs @@ -1275,6 +1275,10 @@ fn plan_kafka_source_connection( // handled below None } + SourceIncludeMetadata::DebeziumMetadata { .. } => { + // handled in apply_source_envelope_encoding + None + } }) .collect(); Ok(KafkaSourceConnection { @@ -1411,8 +1415,23 @@ fn apply_source_envelope_encoding( ast::DebeziumMode::TiCdc => DebeziumJsonMode::TiCdc, ast::DebeziumMode::None => DebeziumJsonMode::Generic, }; + // Extract INCLUDE DEBEZIUM METADATA column name if specified + let envelope_column = include_metadata + .iter() + .find_map(|m| match m { + SourceIncludeMetadata::DebeziumMetadata { alias } => { + Some(alias.as_ref().map_or_else( + || "debezium_metadata".to_string(), + |a| a.to_string(), + )) + } + _ => None, + }); UnplannedSourceEnvelope::Upsert { - style: UpsertStyle::DebeziumJson { mode: storage_mode }, + style: UpsertStyle::DebeziumJson { + mode: storage_mode, + envelope_column, + }, } } _ => { @@ -1483,6 +1502,21 @@ fn apply_source_envelope_encoding( } }; + // Validate INCLUDE DEBEZIUM METADATA is only used with ENVELOPE DEBEZIUM + JSON + let has_dbz_metadata = include_metadata + .iter() + .any(|m| matches!(m, SourceIncludeMetadata::DebeziumMetadata { .. })); + if has_dbz_metadata { + match &envelope { + UnplannedSourceEnvelope::Upsert { + style: UpsertStyle::DebeziumJson { .. }, + } => {} // valid + _ => sql_bail!( + "INCLUDE DEBEZIUM METADATA requires ENVELOPE DEBEZIUM with VALUE FORMAT JSON" + ), + } + } + let metadata_desc = included_column_desc(metadata_columns_desc); let (envelope, desc) = envelope.desc(key_desc, value_desc, metadata_desc)?; @@ -1923,6 +1957,10 @@ pub fn plan_create_table_from_source( // handled below None } + SourceIncludeMetadata::DebeziumMetadata { .. } => { + // handled in apply_source_envelope_encoding + None + } }) .collect(); diff --git a/src/storage-types/src/sources/envelope.rs b/src/storage-types/src/sources/envelope.rs index ba07b605ab0e5..99c3b2e95030e 100644 --- a/src/storage-types/src/sources/envelope.rs +++ b/src/storage-types/src/sources/envelope.rs @@ -85,7 +85,12 @@ pub enum UpsertStyle { Debezium { after_idx: usize }, /// `ENVELOPE DEBEZIUM` with JSON encoding. Envelope extraction happens at /// render time by parsing the Jsonb datum to extract `op` and `after` fields. - DebeziumJson { mode: DebeziumJsonMode }, + /// If `envelope_column` is set (via `INCLUDE DEBEZIUM METADATA`), the full + /// Debezium envelope is exposed as an additional jsonb column. + DebeziumJson { + mode: DebeziumJsonMode, + envelope_column: Option, + }, /// `ENVELOPE UPSERT` where any decode errors will get serialized into a /// SqlScalarType::Record column named `error_column`, and all value columns are /// nullable. The key shape depends on the independent `KeyEnvelope`. @@ -240,21 +245,24 @@ impl UnplannedSourceEnvelope { ) } UnplannedSourceEnvelope::Upsert { - style: UpsertStyle::DebeziumJson { .. }, + style: UpsertStyle::DebeziumJson { envelope_column, .. }, .. } => { - // JSON Debezium: output is [key: Jsonb, data: Jsonb, envelope: Jsonb, ...metadata] + // JSON Debezium: output is [key: Jsonb, data: Jsonb, ...metadata] // Key is included as a separate column (unlike Avro Debezium). // `data` contains the `after` field (the row payload). - // `envelope` contains the full Debezium envelope for CDC metadata - // access (e.g., source.commit_ts for TiCDC correlation). + // If INCLUDE DEBEZIUM METADATA is specified, an additional jsonb column + // contains the full Debezium envelope for CDC metadata access. let key_desc = RelationDesc::builder() .with_column("key", SqlScalarType::Jsonb.nullable(false)) .finish(); - let value_desc = RelationDesc::builder() - .with_column("data", SqlScalarType::Jsonb.nullable(false)) - .with_column("envelope", SqlScalarType::Jsonb.nullable(false)) - .finish(); + let mut value_builder = RelationDesc::builder() + .with_column("data", SqlScalarType::Jsonb.nullable(false)); + if let Some(col_name) = envelope_column { + value_builder = + value_builder.with_column(col_name.as_str(), SqlScalarType::Jsonb.nullable(false)); + } + let value_desc = value_builder.finish(); // key_indices = [0]: points to the key column for UpsertKey::from_value rehydration let key_indices = vec![0usize]; let desc = key_desc diff --git a/src/storage/src/render/sources.rs b/src/storage/src/render/sources.rs index b9bc14e556625..8685cdc94a900 100644 --- a/src/storage/src/render/sources.rs +++ b/src/storage/src/render/sources.rs @@ -552,14 +552,19 @@ fn upsert_commands( let value = match result.value { Some(Ok(ref row)) => match upsert_envelope.style { - UpsertStyle::DebeziumJson { ref mode } => { - match extract_debezium_json(row, mode) { + UpsertStyle::DebeziumJson { + ref mode, + ref envelope_column, + } => { + match extract_debezium_json(row, mode, envelope_column.is_some()) { Ok(Some((after_row, envelope_row))) => { let mut packer = row_buf.packer(); - // key, data (after), envelope (full), then metadata + // key, data (after), [envelope if requested], then metadata packer.extend_by_row(&key_row); packer.extend_by_row(&after_row); - packer.extend_by_row(&envelope_row); + if let Some(env_row) = envelope_row { + packer.extend_by_row(&env_row); + } packer.extend_by_row(&metadata); Some(Ok(row_buf.clone())) } @@ -638,16 +643,18 @@ fn upsert_commands( /// Handles both `payload`-wrapped (`{ "payload": { "op": ..., "after": ... } }`) /// and flat (`{ "op": ..., "after": ... }`) formats. /// -/// Extracts the `after` payload and full envelope from a Debezium JSON message. +/// Extracts the `after` payload (and optionally the full envelope) from a Debezium JSON message. /// /// Returns: -/// - `Ok(Some((after_row, envelope_row)))` for insert/update/snapshot operations (op = c/u/r) +/// - `Ok(Some((after_row, Some(envelope_row))))` when `include_envelope` is true +/// - `Ok(Some((after_row, None)))` when `include_envelope` is false /// - `Ok(None)` for delete/truncate operations (op = d/t) /// - `Err(DecodeError)` for malformed messages fn extract_debezium_json( row: &Row, _mode: &DebeziumJsonMode, -) -> Result, DecodeError> { + include_envelope: bool, +) -> Result)>, DecodeError> { use mz_repr::adt::jsonb::JsonbRef; let datum = row.iter().next().unwrap(); @@ -713,18 +720,24 @@ fn extract_debezium_json( } })?; - // Preserve the full envelope as Jsonb for CDC metadata access - let envelope_json = - mz_repr::adt::jsonb::Jsonb::from_serde_json(envelope.clone()).map_err(|e| { - DecodeError { - kind: DecodeErrorKind::Text( - format!("Failed to convert Debezium envelope to Jsonb: {}", e).into(), - ), - raw: vec![], - } - })?; + // Optionally preserve the full envelope as Jsonb for CDC metadata access + let envelope_row = if include_envelope { + let envelope_json = + mz_repr::adt::jsonb::Jsonb::from_serde_json(envelope.clone()).map_err(|e| { + DecodeError { + kind: DecodeErrorKind::Text( + format!("Failed to convert Debezium envelope to Jsonb: {}", e) + .into(), + ), + raw: vec![], + } + })?; + Some(envelope_json.into_row()) + } else { + None + }; - Ok(Some((after_json.into_row(), envelope_json.into_row()))) + Ok(Some((after_json.into_row(), envelope_row))) } other => Err(DecodeError { kind: DecodeErrorKind::Text( diff --git a/test/testdrive/kafka-debezium-json-sources.td b/test/testdrive/kafka-debezium-json-sources.td index 28dd0f84c9f5f..a8040d1f1e30e 100644 --- a/test/testdrive/kafka-debezium-json-sources.td +++ b/test/testdrive/kafka-debezium-json-sources.td @@ -39,16 +39,12 @@ $ kafka-ingest format=bytes key-format=bytes key-terminator=: topic=dbz-json ENVELOPE DEBEZIUM > COMMIT -# Output schema is [key: jsonb, data: jsonb, envelope: jsonb] +# Output schema is [key: jsonb, data: jsonb] # The last update for key=1 should win (lizard) # Use ->> to extract fields since JSON key order is non-deterministic > SELECT key, data->>'id', data->>'creature' FROM dbz_json_tbl 1 1 lizard -# The envelope column contains the full Debezium envelope -> SELECT envelope->>'op' FROM dbz_json_tbl -u - # --- Insert a second row --- $ kafka-ingest format=bytes key-format=bytes key-terminator=: topic=dbz-json @@ -158,3 +154,48 @@ contains:ENVELOPE [DEBEZIUM] UPSERT requires that KEY FORMAT be specified KEY FORMAT TEXT VALUE FORMAT TEXT ENVELOPE DEBEZIUM contains:ENVELOPE DEBEZIUM requires that VALUE FORMAT is set to AVRO or JSON + +# --- Test INCLUDE DEBEZIUM METADATA --- + +$ kafka-create-topic topic=dbz-json-dbzmeta partitions=1 + +$ kafka-ingest format=bytes key-format=bytes key-terminator=: topic=dbz-json-dbzmeta +1:{"before":null,"after":{"id":1,"val":"hello"},"op":"c","ts_ms":1000,"source":{"connector":"test"}} + +> BEGIN +> CREATE SOURCE dbz_json_dbzmeta_src + IN CLUSTER dbz_json_cluster + FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-dbz-json-dbzmeta-${testdrive.seed}'); + +> CREATE TABLE dbz_json_dbzmeta_tbl FROM SOURCE dbz_json_dbzmeta_src (REFERENCE "testdrive-dbz-json-dbzmeta-${testdrive.seed}") + KEY FORMAT JSON VALUE FORMAT JSON + INCLUDE DEBEZIUM METADATA AS dbz_envelope + ENVELOPE DEBEZIUM +> COMMIT + +> SELECT key, data->>'val', dbz_envelope->>'op', dbz_envelope->>'ts_ms' FROM dbz_json_dbzmeta_tbl +1 hello c 1000 + +# --- Test INCLUDE DEBEZIUM METADATA with default column name --- + +> BEGIN +> CREATE SOURCE dbz_json_dbzmeta_default_src + IN CLUSTER dbz_json_cluster + FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-dbz-json-dbzmeta-${testdrive.seed}'); + +> CREATE TABLE dbz_json_dbzmeta_default_tbl FROM SOURCE dbz_json_dbzmeta_default_src (REFERENCE "testdrive-dbz-json-dbzmeta-${testdrive.seed}") + KEY FORMAT JSON VALUE FORMAT JSON + INCLUDE DEBEZIUM METADATA + ENVELOPE DEBEZIUM +> COMMIT + +> SELECT key, debezium_metadata->>'op' FROM dbz_json_dbzmeta_default_tbl +1 c + +# --- Error case: INCLUDE DEBEZIUM METADATA without ENVELOPE DEBEZIUM --- + +! CREATE TABLE dbz_json_nokey_tbl FROM SOURCE dbz_json_nokey_src (REFERENCE "testdrive-dbz-json-nokey-${testdrive.seed}") + KEY FORMAT JSON VALUE FORMAT JSON + INCLUDE DEBEZIUM METADATA + ENVELOPE UPSERT +contains:INCLUDE DEBEZIUM METADATA requires ENVELOPE DEBEZIUM with VALUE FORMAT JSON diff --git a/test/testdrive/kafka-debezium-json-ticdc-sources.td b/test/testdrive/kafka-debezium-json-ticdc-sources.td index 1b5565b7799b6..b870bdab963f9 100644 --- a/test/testdrive/kafka-debezium-json-ticdc-sources.td +++ b/test/testdrive/kafka-debezium-json-ticdc-sources.td @@ -39,6 +39,7 @@ $ kafka-ingest format=bytes key-format=bytes key-terminator=: topic=dbz-ticdc > CREATE TABLE dbz_ticdc_tbl FROM SOURCE dbz_ticdc_src (REFERENCE "testdrive-dbz-ticdc-${testdrive.seed}") KEY FORMAT JSON VALUE FORMAT JSON + INCLUDE DEBEZIUM METADATA AS dbz_meta ENVELOPE DEBEZIUM (MODE = 'TICDC') > COMMIT @@ -46,15 +47,15 @@ $ kafka-ingest format=bytes key-format=bytes key-terminator=: topic=dbz-ticdc 1 alice_updated 2 bob -# --- Verify envelope column preserves CDC metadata --- -# The envelope column contains the full Debezium envelope including source metadata. +# --- Verify INCLUDE DEBEZIUM METADATA preserves CDC metadata --- +# The dbz_meta column contains the full Debezium envelope including source metadata. # This is critical for TiCDC users who need commit_ts to correlate with Dumpling snapshots. -> SELECT key, envelope->'source'->>'commit_ts' FROM dbz_ticdc_tbl ORDER BY key +> SELECT key, dbz_meta->'source'->>'commit_ts' FROM dbz_ticdc_tbl ORDER BY key 1 1002 2 1001 # ts_ms is also available -> SELECT key, envelope->>'ts_ms' FROM dbz_ticdc_tbl ORDER BY key +> SELECT key, dbz_meta->>'ts_ms' FROM dbz_ticdc_tbl ORDER BY key 1 1609459200002 2 1609459200001 From 0d3f24c67a576c02b0b6a94a93d94c38ff953e6c Mon Sep 17 00:00:00 2001 From: Jon Currey Date: Fri, 6 Mar 2026 09:12:56 -0500 Subject: [PATCH 5/5] doc: Update spec with INCLUDE DEBEZIUM METADATA design decision Add DD-18 documenting the opt-in envelope metadata approach via INCLUDE DEBEZIUM METADATA [AS alias]. Update Task 2b (parser), Task 3 (UpsertStyle variant), Task 4 (descriptor), Task 5 (planner), Task 6 (render), and acceptance criteria (AC-7, AC-18 through AC-20). Co-Authored-By: Claude Opus 4.6 --- ...20260305_debezium_json_envelope_sources.md | 84 ++++++++++++------- 1 file changed, 55 insertions(+), 29 deletions(-) diff --git a/doc/developer/spec/20260305_debezium_json_envelope_sources.md b/doc/developer/spec/20260305_debezium_json_envelope_sources.md index aef8ad4f745b2..20ae5dee70764 100644 --- a/doc/developer/spec/20260305_debezium_json_envelope_sources.md +++ b/doc/developer/spec/20260305_debezium_json_envelope_sources.md @@ -136,7 +136,7 @@ For JSON + Debezium, the JSON decoder still produces a single Jsonb datum contai - **Rationale:** Keeps all envelope extraction logic co-located. The JSON path parses at render time (slightly different from Avro's index-based approach) but the upsert semantics (key + value → insert/update/delete) remain identical. **DD-9: Include key as separate output column (resolves key_indices correctness) [F1, F3, F5, F6]** -- **Decision:** Unlike Avro Debezium (which embeds key fields inside the unpacked `after` Record), JSON Debezium includes the key as a separate `key: Jsonb` column prepended to the output row. The output schema is `[key: Jsonb, data: Jsonb, ...metadata]`. `key_indices` points to position `[0]` (the key column). This is necessary because: (a) `upsert_core` uses `UpsertKey::from_value(value_ref, &key_indices)` to reconstruct keys from persisted values during rehydration — empty/wrong `key_indices` would cause data corruption after restart; (b) the `data: Jsonb` column is opaque so `match_key_indices` cannot find named key fields within it; (c) users need access to key fields since they can't be extracted from the opaque `after` payload without knowing the schema. +- **Decision:** Unlike Avro Debezium (which embeds key fields inside the unpacked `after` Record), JSON Debezium includes the key as a separate `key: Jsonb` column prepended to the output row. The base output schema is `[key: Jsonb, data: Jsonb, ...metadata]`. `key_indices` points to position `[0]` (the key column). This is necessary because: (a) `upsert_core` uses `UpsertKey::from_value(value_ref, &key_indices)` to reconstruct keys from persisted values during rehydration — empty/wrong `key_indices` would cause data corruption after restart; (b) the `data: Jsonb` column is opaque so `match_key_indices` cannot find named key fields within it; (c) users need access to key fields since they can't be extracted from the opaque `after` payload without knowing the schema. - **Consequence:** `INCLUDE KEY` rejection logic (which fires for Avro Debezium because "Debezium values include all keys") must NOT fire for JSON Debezium — the key is always included as a separate column. Update the validation at `ddl.rs:1372-1384` accordingly. - **To revisit:** Whether the key column should be named `key` (single Jsonb blob) or flattened into multiple columns if the key JSON has multiple top-level fields. @@ -174,6 +174,13 @@ For JSON + Debezium, the JSON decoder still produces a single Jsonb datum contai - **Known constraint:** Users must ensure same-key events are routed to the same partition (standard Kafka key-based partitioning) for correct ordering. - **To revisit:** Whether `commit_ts`-based ordering (DD-3) could resolve cross-partition ordering for TiCDC. +**DD-18: Opt-in envelope metadata via INCLUDE DEBEZIUM METADATA** +- **Decision:** The full Debezium envelope (including `op`, `source`, `ts_ms`, `before`, `after`, `transaction`, etc.) is exposed as an opt-in jsonb column via `INCLUDE DEBEZIUM METADATA [AS alias]`. Default column name is `debezium_metadata` when no alias is given. +- **Rationale:** Users need CDC metadata like `commit_ts` (for TiCDC/Dumpling snapshot correlation) and `ts_ms`. Rather than always including this data (which duplicates the `after` payload and wastes storage), the `INCLUDE` mechanism follows the existing pattern of `INCLUDE OFFSET`, `INCLUDE PARTITION`, etc. — opt-in, user-named columns for metadata. +- **Alternatives considered:** (A) Granular `INCLUDE DEBEZIUM SOURCE`, `INCLUDE DEBEZIUM TIMESTAMP` — more parser work for questionable benefit since users can use `->` on a single jsonb column. (B) Magic column names like `__debezium_source` — implicit and surprising. (C) Always-on `envelope` column — wastes storage when not needed, inconsistent with INCLUDE pattern. +- **Validation:** `INCLUDE DEBEZIUM METADATA` is only valid with `ENVELOPE DEBEZIUM` + `VALUE FORMAT JSON`. Using it with other envelopes produces a planning error. +- **Implementation:** The column name is stored in `UpsertStyle::DebeziumJson { envelope_column: Option }`. At render time, `extract_debezium_json()` conditionally serializes the full envelope only when `envelope_column.is_some()`. + **DD-17: Pseudocode is illustrative, not compilable [F7, F13]** - **Decision:** All pseudocode in this spec (particularly `extract_debezium_json` and the `upsert_commands` match arm) is illustrative of the intended logic, not compilable Rust. Implementation must handle Rust-specific concerns: (a) `Datum::Jsonb` wraps a borrowed `JsonbRef<'a>` — extracting and re-packing requires allocating into a `Row` via `RowPacker`; (b) the nested match structure in Task 5 should be simplified during implementation; (c) error types must use the concrete `DecodeErrorKind` variants available in the codebase. - **Rationale:** Spec pseudocode communicates intent. The implementing developer is expected to adapt to Rust's ownership/lifetime model. @@ -219,6 +226,11 @@ For JSON + Debezium, the JSON decoder still produces a single Jsonb datum contai ``` - Notes: The `MODE` keyword may need to be added to the keyword list if not already present. +- [ ] **Task 2b: Add INCLUDE DEBEZIUM METADATA to parser** + - File: `src/sql-parser/src/parser.rs`, `src/sql-parser/src/ast/defs/ddl.rs` + - Action: Add `DebeziumMetadata { alias: Option }` variant to `SourceIncludeMetadata` enum. In `parse_source_include_metadata()`, add `DEBEZIUM` to the keyword list; when matched, expect `METADATA` keyword, then parse optional alias. Update `AstDisplay` for the new variant. + - Notes: The two-keyword sequence `DEBEZIUM METADATA` avoids ambiguity with the existing `DEBEZIUM` keyword used in envelope parsing. + - [ ] **Task 3: Add UpsertStyle::DebeziumJson variant** - File: `src/storage-types/src/sources/envelope.rs` - Action: Add a new variant to `UpsertStyle`: @@ -226,7 +238,7 @@ For JSON + Debezium, the JSON decoder still produces a single Jsonb datum contai pub enum UpsertStyle { Default(KeyEnvelope), Debezium { after_idx: usize }, - DebeziumJson { mode: DebeziumJsonMode }, + DebeziumJson { mode: DebeziumJsonMode, envelope_column: Option }, ValueErrInline { key_envelope: KeyEnvelope, error_column: String }, } ``` @@ -234,24 +246,29 @@ For JSON + Debezium, the JSON decoder still produces a single Jsonb datum contai ```rust pub enum DebeziumJsonMode { Generic, TiCdc } ``` + - Notes: `envelope_column` is `Some(name)` when `INCLUDE DEBEZIUM METADATA [AS name]` is specified, `None` otherwise. See DD-18. - Notes: Must derive `Clone, Debug, Serialize, Deserialize, Eq, PartialEq` to match existing variants. - [ ] **Task 4: Add UnplannedSourceEnvelope descriptor transform for DebeziumJson** - File: `src/storage-types/src/sources/envelope.rs` - - Action: In the `UnplannedSourceEnvelope` match that builds the output `RelationDesc` (line ~230), add a new arm for `UpsertStyle::DebeziumJson`. Per DD-9, the output schema is `[key: Jsonb, data: Jsonb, envelope: Jsonb, ...metadata]`: + - Action: In the `UnplannedSourceEnvelope` match that builds the output `RelationDesc` (line ~230), add a new arm for `UpsertStyle::DebeziumJson`. Per DD-9, the base output schema is `[key: Jsonb, data: Jsonb, ...metadata]`. If `INCLUDE DEBEZIUM METADATA` is specified, an additional jsonb column is added between `data` and metadata: ```rust UnplannedSourceEnvelope::Upsert { - style: UpsertStyle::DebeziumJson { .. }, + style: UpsertStyle::DebeziumJson { envelope_column, .. }, } => { - // Key is included as a separate column (unlike Avro Debezium where keys are in the after record) - let mut desc = RelationDesc::builder() + let key_desc = RelationDesc::builder() .with_column("key", ScalarType::Jsonb.nullable(false)) - .with_column("data", ScalarType::Jsonb.nullable(false)) .finish(); - // key_indices = [0] — points to the key column for UpsertKey::from_value rehydration + let mut value_builder = RelationDesc::builder() + .with_column("data", ScalarType::Jsonb.nullable(false)); + if let Some(col_name) = envelope_column { + value_builder = value_builder + .with_column(col_name, ScalarType::Jsonb.nullable(false)); + } + let value_desc = value_builder.finish(); let key_indices = vec![0usize]; - desc = desc.with_key(key_indices.clone()); - let desc = desc.concat(metadata_desc); + let desc = key_desc.with_key(key_indices.clone()) + .concat(value_desc).concat(metadata_desc); ( self.into_source_envelope(Some(key_indices), None, Some(desc.arity())), desc, @@ -296,21 +313,22 @@ For JSON + Debezium, the JSON decoder still produces a single Jsonb datum contai ``` - Action: Update all other match arms referencing `ast::SourceEnvelope::Debezium` to use `ast::SourceEnvelope::Debezium { .. }` pattern (lines 810, 1372-1384, and any others found by the compiler). - Action: Per DD-9, update `INCLUDE KEY` rejection logic at `ddl.rs:1372-1384` — the "Cannot use INCLUDE KEY with ENVELOPE DEBEZIUM" error must NOT fire when `DataEncoding::Json` is used. For JSON Debezium, key is always included as a separate column. + - Action: Extract `INCLUDE DEBEZIUM METADATA` from `include_metadata`, pass column name to `UpsertStyle::DebeziumJson { envelope_column }`. Filter `DebeziumMetadata` items out of Kafka metadata processing (they're not Kafka metadata). Add validation: `INCLUDE DEBEZIUM METADATA` requires `ENVELOPE DEBEZIUM` + `VALUE FORMAT JSON`. See DD-18. - Notes: The error message changes from "AVRO" to "AVRO or JSON". KEY FORMAT validation at line 2247 already covers Debezium and requires no changes. - [ ] **Task 6: Implement DebeziumJson extraction in upsert_commands** - File: `src/storage/src/render/sources.rs` - Action: Add a new match arm in `upsert_commands()` alongside the existing `UpsertStyle::Debezium { after_idx }` arm (line ~549): ```rust - UpsertStyle::DebeziumJson { mode } => { - // The row contains a single Jsonb datum with the full Debezium envelope - let jsonb_datum = row.iter().next().unwrap(); - match extract_debezium_json(jsonb_datum, mode) { - Ok(Some(after_jsonb_row)) => { - // Per DD-9: prepend key, then after payload, then metadata + UpsertStyle::DebeziumJson { ref mode, ref envelope_column } => { + match extract_debezium_json(row, mode, envelope_column.is_some()) { + Ok(Some((after_row, envelope_row))) => { let mut packer = row_buf.packer(); - packer.extend(key_row.iter()); // key at position 0 - packer.extend(after_jsonb_row.iter()); // data at position 1 + packer.extend_by_row(&key_row); // key at position 0 + packer.extend_by_row(&after_row); // data at position 1 + if let Some(env_row) = envelope_row { // envelope (opt-in via DD-18) + packer.extend_by_row(&env_row); + } packer.extend_by_row(&metadata); Some(Ok(row_buf.clone())) } @@ -319,23 +337,25 @@ For JSON + Debezium, the JSON decoder still produces a single Jsonb datum contai } } ``` - - Action: Implement `extract_debezium_json()` helper function (in same file or in `src/interchange/src/json.rs`). Per DD-12/DD-13/DD-17, pseudocode is illustrative: + - Action: Implement `extract_debezium_json()` helper function (in same file or in `src/interchange/src/json.rs`). Per DD-12/DD-13/DD-17/DD-18, pseudocode is illustrative: ```rust fn extract_debezium_json( - datum: Datum<'_>, + row: &Row, mode: &DebeziumJsonMode, - ) -> Result, DecodeErrorKind> { - // 1. Convert Jsonb datum to serde_json::Value (requires Row allocation per DD-17) + include_envelope: bool, + ) -> Result)>, DecodeError> { + // 1. Convert Jsonb datum to serde_json::Value via JsonbRef::from_datum() // 2. Check for "payload" wrapper; if present, unwrap // 3. Extract "op" field: // - "c" or "r" (per DD-12: snapshot reads treated as inserts) → extract after // - "u" → extract after // - "d" → return Ok(None) // - "t" → log warning, return Ok(None) per DD-13 - // - missing/other → return Err(DecodeErrorKind) + // - missing/other → return Err(DecodeError) // 4. Extract "after" field → must be non-null for c/r/u, else Err // 5. Serialize "after" back into a Row containing a single Jsonb datum - // 6. Return Ok(Some(row)) + // 6. If include_envelope, serialize full envelope into a separate Row (DD-18) + // 7. Return Ok(Some((after_row, envelope_row))) } ``` - Action: Also update the key_row match arm (line ~517) to include `UpsertStyle::DebeziumJson { .. }` alongside `UpsertStyle::Debezium { .. }` for key handling. @@ -373,7 +393,7 @@ For JSON + Debezium, the JSON decoder still produces a single Jsonb datum contai - [ ] AC-6: Given a Debezium JSON message with `"op": "r"` (snapshot/read), when ingested, then it is treated as an insert (same as `"c"`). [DD-12] **TiCDC Mode:** -- [ ] AC-7: Given `ENVELOPE DEBEZIUM (MODE = 'TICDC')` in the CREATE SOURCE statement, when a TiCDC-formatted JSON message is published, then the source processes it correctly. The full Debezium envelope (including `commit_ts`, `cluster_id`, and other TiCDC metadata) is preserved in the `envelope` column for user queries (e.g., `envelope->'source'->>'commit_ts'`). +- [ ] AC-7: Given `ENVELOPE DEBEZIUM (MODE = 'TICDC')` with `INCLUDE DEBEZIUM METADATA AS dbz_meta` in the CREATE TABLE statement, when a TiCDC-formatted JSON message is published, then the source processes it correctly. The full Debezium envelope (including `commit_ts`, `cluster_id`, and other TiCDC metadata) is preserved in the `dbz_meta` column for user queries (e.g., `dbz_meta->'source'->>'commit_ts'`). **Error Handling:** - [ ] AC-8: Given a JSON message with a missing `"op"` field, when ingested, then a decode error is surfaced through the source's error collection (not silently dropped). @@ -382,6 +402,11 @@ For JSON + Debezium, the JSON decoder still produces a single Jsonb datum contai - [ ] AC-11: Given a Debezium JSON message with `"op": "t"` (truncate), when ingested, then it is silently skipped (not errored). [DD-13] - [ ] AC-12: Given a null Kafka message value (tombstone), when ingested, then it is treated as a delete for the given key. [DD-14] +**INCLUDE DEBEZIUM METADATA:** +- [ ] AC-18: Given `INCLUDE DEBEZIUM METADATA AS dbz_envelope` with `KEY FORMAT JSON VALUE FORMAT JSON ENVELOPE DEBEZIUM`, when a message with `source`, `ts_ms`, and `op` fields is published, then the `dbz_envelope` column contains the full Debezium envelope as jsonb. +- [ ] AC-19: Given `INCLUDE DEBEZIUM METADATA` (no alias) with `ENVELOPE DEBEZIUM`, when the table is created, then the column is named `debezium_metadata`. +- [ ] AC-20: Given `INCLUDE DEBEZIUM METADATA` with `ENVELOPE UPSERT` (not DEBEZIUM), when CREATE TABLE is executed, then it fails with an error requiring ENVELOPE DEBEZIUM with VALUE FORMAT JSON. + **SQL Validation:** - [ ] AC-13: Given `VALUE FORMAT JSON, ENVELOPE DEBEZIUM` without `KEY FORMAT`, when CREATE SOURCE is executed, then it fails with an error requiring KEY FORMAT. - [ ] AC-14: Given `VALUE FORMAT TEXT, ENVELOPE DEBEZIUM`, when CREATE SOURCE is executed, then it fails with an error requiring AVRO or JSON format. @@ -404,8 +429,8 @@ For JSON + Debezium, the JSON decoder still produces a single Jsonb datum contai ### Testing Strategy **Testdrive (integration):** -- `test/testdrive/kafka-debezium-json-sources.td` — primary test file covering generic Debezium JSON: insert, update, delete, payload-wrapped, flat, compound key, error cases -- `test/testdrive/kafka-debezium-json-ticdc-sources.td` — TiCDC dialect mode tests +- `test/testdrive/kafka-debezium-json-sources.td` — primary test file covering generic Debezium JSON: insert, update, delete, payload-wrapped, flat, INCLUDE OFFSET, INCLUDE DEBEZIUM METADATA (with alias and default name), error cases +- `test/testdrive/kafka-debezium-json-ticdc-sources.td` — TiCDC dialect mode tests with INCLUDE DEBEZIUM METADATA to verify `commit_ts` access **Regression:** - Run existing `test/testdrive/kafka-upsert-debezium-sources.td` and related Avro Debezium tests to ensure no breakage from the `SourceEnvelope::Debezium` AST change @@ -419,14 +444,15 @@ For JSON + Debezium, the JSON decoder still produces a single Jsonb datum contai ### Notes -- All identified risks are captured as design decisions DD-9 through DD-17. +- All identified risks are captured as design decisions DD-9 through DD-18. - TiCDC MODE is parsed and stored but V1 behavior is identical to generic mode (DD-15). - Multi-partition ordering is a known constraint documented in DD-16. +- CDC metadata (e.g., `commit_ts`, `ts_ms`) is accessible via opt-in `INCLUDE DEBEZIUM METADATA` (DD-18). **Future considerations (out of scope):** - Schema-aware column projection (`CREATE SOURCE ... (id int, name text) FORMAT JSON ENVELOPE DEBEZIUM`) - Keyless upsert where key is derived from the Debezium payload -- `commit_ts`-based ordering for TiCDC (the field is already preserved in the `envelope` column; future work is using it for message ordering) +- `commit_ts`-based ordering for TiCDC (the field is already preserved via `INCLUDE DEBEZIUM METADATA`; future work is using it for message ordering) - Additional MODE values: Maxwell, Canal, etc. - Truncate operation support (DD-13)