Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
492 changes: 492 additions & 0 deletions doc/developer/spec/20260305_debezium_json_envelope_sources.md

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion src/catalog/src/memory/objects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1033,7 +1033,8 @@ impl DataSourceDesc {
SourceEnvelope::None(_) => "none",
SourceEnvelope::Upsert(upsert_envelope) => match upsert_envelope.style {
mz_storage_types::sources::envelope::UpsertStyle::Default(_) => "upsert",
mz_storage_types::sources::envelope::UpsertStyle::Debezium { .. } => {
mz_storage_types::sources::envelope::UpsertStyle::Debezium { .. }
| mz_storage_types::sources::envelope::UpsertStyle::DebeziumJson { .. } => {
// NOTE(aljoscha): Should we somehow mark that this is
// using upsert internally? See note above about
// DEBEZIUM.
Expand Down
28 changes: 25 additions & 3 deletions src/sql-parser/src/ast/defs/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -560,6 +560,10 @@ pub enum SourceIncludeMetadata {
alias: Ident,
use_bytes: bool,
},
/// `INCLUDE DEBEZIUM METADATA` — full Debezium envelope as jsonb
DebeziumMetadata {
alias: Option<Ident>,
},
}

impl AstDisplay for SourceIncludeMetadata {
Expand Down Expand Up @@ -605,6 +609,10 @@ impl AstDisplay for SourceIncludeMetadata {
f.write_str(" BYTES");
}
}
SourceIncludeMetadata::DebeziumMetadata { alias } => {
f.write_str("DEBEZIUM METADATA");
print_alias(f, alias);
}
}
}
}
Expand Down Expand Up @@ -633,10 +641,20 @@ impl AstDisplay for SourceErrorPolicy {
}
impl_display!(SourceErrorPolicy);

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum DebeziumMode {
/// Generic Debezium JSON/Avro format.
None,
/// TiCDC dialect of Debezium format.
TiCdc,
}

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum SourceEnvelope {
None,
Debezium,
Debezium {
mode: DebeziumMode,
},
Upsert {
value_decode_err_policy: Vec<SourceErrorPolicy>,
},
Expand All @@ -649,7 +667,7 @@ impl SourceEnvelope {
pub fn requires_all_input(&self) -> bool {
match self {
SourceEnvelope::None => false,
SourceEnvelope::Debezium => false,
SourceEnvelope::Debezium { .. } => false,
SourceEnvelope::Upsert { .. } => false,
SourceEnvelope::CdcV2 => true,
}
Expand All @@ -663,8 +681,12 @@ impl AstDisplay for SourceEnvelope {
// this is unreachable as long as the default is None, but include it in case we ever change that
f.write_str("NONE");
}
Self::Debezium => {
Self::Debezium { mode } => {
f.write_str("DEBEZIUM");
match mode {
DebeziumMode::None => {}
DebeziumMode::TiCdc => f.write_str(" (MODE = 'TICDC')"),
}
}
Self::Upsert {
value_decode_err_policy,
Expand Down
28 changes: 24 additions & 4 deletions src/sql-parser/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2363,7 +2363,21 @@ impl<'a> Parser<'a> {
let envelope = if self.parse_keyword(NONE) {
SourceEnvelope::None
} else if self.parse_keyword(DEBEZIUM) {
SourceEnvelope::Debezium
let mode = if self.consume_token(&Token::LParen) {
self.expect_keyword(MODE)?;
self.expect_token(&Token::Eq)?;
let mode_str = self.parse_literal_string()?;
self.expect_token(&Token::RParen)?;
match mode_str.to_uppercase().as_str() {
"TICDC" => DebeziumMode::TiCdc,
_ => {
return self.expected(self.peek_pos(), "TICDC", self.peek_token());
}
}
} else {
DebeziumMode::None
};
SourceEnvelope::Debezium { mode }
} else if self.parse_keyword(UPSERT) {
let value_decode_err_policy = if self.consume_token(&Token::LParen) {
// We only support the `VALUE DECODING ERRORS` option for now, but if we add another
Expand Down Expand Up @@ -4816,9 +4830,9 @@ impl<'a> Parser<'a> {
fn parse_source_include_metadata(&mut self) -> Result<Vec<SourceIncludeMetadata>, ParserError> {
if self.parse_keyword(INCLUDE) {
self.parse_comma_separated(|parser| {
let metadata = match parser
.expect_one_of_keywords(&[KEY, TIMESTAMP, PARTITION, OFFSET, HEADERS, HEADER])?
{
let metadata = match parser.expect_one_of_keywords(&[
KEY, TIMESTAMP, PARTITION, OFFSET, HEADERS, HEADER, DEBEZIUM,
])? {
KEY => SourceIncludeMetadata::Key {
alias: parser.parse_alias()?,
},
Expand All @@ -4845,6 +4859,12 @@ impl<'a> Parser<'a> {
use_bytes,
}
}
DEBEZIUM => {
parser.expect_keyword(METADATA)?;
SourceIncludeMetadata::DebeziumMetadata {
alias: parser.parse_alias()?,
}
}
_ => unreachable!("only explicitly allowed items can be parsed"),
};
Ok(metadata)
Expand Down
18 changes: 9 additions & 9 deletions src/sql-parser/tests/testdata/ddl
Original file line number Diff line number Diff line change
Expand Up @@ -2511,38 +2511,38 @@ CREATE SOURCE src1 FROM KAFKA CONNECTION conn1 (TOPIC 'baz') FORMAT AVRO USING C
----
CREATE SOURCE src1 FROM KAFKA CONNECTION conn1 (TOPIC = 'baz') FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION conn2 ENVELOPE DEBEZIUM
=>
CreateSource(CreateSourceStatement { name: UnresolvedItemName([Ident("src1")]), in_cluster: None, col_names: [], connection: Kafka { connection: Name(UnresolvedItemName([Ident("conn1")])), options: [KafkaSourceConfigOption { name: Topic, value: Some(Value(String("baz"))) }] }, include_metadata: [], format: Some(Bare(Avro(Csr { csr_connection: CsrConnectionAvro { connection: CsrConnection { connection: Name(UnresolvedItemName([Ident("conn2")])), options: [] }, key_strategy: None, value_strategy: None, seed: None } }))), envelope: Some(Debezium), if_not_exists: false, key_constraint: None, with_options: [], external_references: None, progress_subsource: None })
CreateSource(CreateSourceStatement { name: UnresolvedItemName([Ident("src1")]), in_cluster: None, col_names: [], connection: Kafka { connection: Name(UnresolvedItemName([Ident("conn1")])), options: [KafkaSourceConfigOption { name: Topic, value: Some(Value(String("baz"))) }] }, include_metadata: [], format: Some(Bare(Avro(Csr { csr_connection: CsrConnectionAvro { connection: CsrConnection { connection: Name(UnresolvedItemName([Ident("conn2")])), options: [] }, key_strategy: None, value_strategy: None, seed: None } }))), envelope: Some(Debezium { mode: None }), if_not_exists: false, key_constraint: None, with_options: [], external_references: None, progress_subsource: None })


parse-statement
CREATE SOURCE src1 FROM KAFKA CONNECTION conn1 (TOPIC 'baz') FORMAT PROTOBUF USING CONFLUENT SCHEMA REGISTRY CONNECTION conn2 ENVELOPE DEBEZIUM
----
CREATE SOURCE src1 FROM KAFKA CONNECTION conn1 (TOPIC = 'baz') FORMAT PROTOBUF USING CONFLUENT SCHEMA REGISTRY CONNECTION conn2 ENVELOPE DEBEZIUM
=>
CreateSource(CreateSourceStatement { name: UnresolvedItemName([Ident("src1")]), in_cluster: None, col_names: [], connection: Kafka { connection: Name(UnresolvedItemName([Ident("conn1")])), options: [KafkaSourceConfigOption { name: Topic, value: Some(Value(String("baz"))) }] }, include_metadata: [], format: Some(Bare(Protobuf(Csr { csr_connection: CsrConnectionProtobuf { connection: CsrConnection { connection: Name(UnresolvedItemName([Ident("conn2")])), options: [] }, seed: None } }))), envelope: Some(Debezium), if_not_exists: false, key_constraint: None, with_options: [], external_references: None, progress_subsource: None })
CreateSource(CreateSourceStatement { name: UnresolvedItemName([Ident("src1")]), in_cluster: None, col_names: [], connection: Kafka { connection: Name(UnresolvedItemName([Ident("conn1")])), options: [KafkaSourceConfigOption { name: Topic, value: Some(Value(String("baz"))) }] }, include_metadata: [], format: Some(Bare(Protobuf(Csr { csr_connection: CsrConnectionProtobuf { connection: CsrConnection { connection: Name(UnresolvedItemName([Ident("conn2")])), options: [] }, seed: None } }))), envelope: Some(Debezium { mode: None }), if_not_exists: false, key_constraint: None, with_options: [], external_references: None, progress_subsource: None })


parse-statement
CREATE SOURCE src1 FROM KAFKA CONNECTION conn1 (TOPIC 'baz') ENVELOPE DEBEZIUM (TRANSACTION METADATA (SOURCE a.b.c, COLLECTION 'foo'))
----
error: Expected end of statement, found left parenthesis
error: Expected MODE, found TRANSACTION
CREATE SOURCE src1 FROM KAFKA CONNECTION conn1 (TOPIC 'baz') ENVELOPE DEBEZIUM (TRANSACTION METADATA (SOURCE a.b.c, COLLECTION 'foo'))
^
^

parse-statement
CREATE SOURCE src1 FROM KAFKA CONNECTION conn1 (TOPIC 'baz') ENVELOPE DEBEZIUM (TRANSACTION METADATA (COLLECTION 'foo', SOURCE a.b.c))
----
error: Expected end of statement, found left parenthesis
error: Expected MODE, found TRANSACTION
CREATE SOURCE src1 FROM KAFKA CONNECTION conn1 (TOPIC 'baz') ENVELOPE DEBEZIUM (TRANSACTION METADATA (COLLECTION 'foo', SOURCE a.b.c))
^
^

# Note that this will error in planning, as you cannot specify START OFFSET and START TIMESTAMP at the same time
parse-statement
CREATE SOURCE src1 FROM KAFKA CONNECTION conn1 (START OFFSET=1, START TIMESTAMP=2, TOPIC 'baz') ENVELOPE DEBEZIUM
----
CREATE SOURCE src1 FROM KAFKA CONNECTION conn1 (START OFFSET = 1, START TIMESTAMP = 2, TOPIC = 'baz') ENVELOPE DEBEZIUM
=>
CreateSource(CreateSourceStatement { name: UnresolvedItemName([Ident("src1")]), in_cluster: None, col_names: [], connection: Kafka { connection: Name(UnresolvedItemName([Ident("conn1")])), options: [KafkaSourceConfigOption { name: StartOffset, value: Some(Value(Number("1"))) }, KafkaSourceConfigOption { name: StartTimestamp, value: Some(Value(Number("2"))) }, KafkaSourceConfigOption { name: Topic, value: Some(Value(String("baz"))) }] }, include_metadata: [], format: None, envelope: Some(Debezium), if_not_exists: false, key_constraint: None, with_options: [], external_references: None, progress_subsource: None })
CreateSource(CreateSourceStatement { name: UnresolvedItemName([Ident("src1")]), in_cluster: None, col_names: [], connection: Kafka { connection: Name(UnresolvedItemName([Ident("conn1")])), options: [KafkaSourceConfigOption { name: StartOffset, value: Some(Value(Number("1"))) }, KafkaSourceConfigOption { name: StartTimestamp, value: Some(Value(Number("2"))) }, KafkaSourceConfigOption { name: Topic, value: Some(Value(String("baz"))) }] }, include_metadata: [], format: None, envelope: Some(Debezium { mode: None }), if_not_exists: false, key_constraint: None, with_options: [], external_references: None, progress_subsource: None })

# Note that this will error in planning, as START OFFSET must be an array of nums
parse-statement
Expand Down Expand Up @@ -2585,15 +2585,15 @@ CREATE SOURCE src1 FROM KAFKA CONNECTION conn1 (TOPIC 'baz') FORMAT PROTOBUF USI
----
CREATE SOURCE src1 FROM KAFKA CONNECTION conn1 (TOPIC = 'baz') FORMAT PROTOBUF USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn SEED VALUE SCHEMA '{"some": "seed"}' MESSAGE 'Batch' ENVELOPE DEBEZIUM
=>
CreateSource(CreateSourceStatement { name: UnresolvedItemName([Ident("src1")]), in_cluster: None, col_names: [], connection: Kafka { connection: Name(UnresolvedItemName([Ident("conn1")])), options: [KafkaSourceConfigOption { name: Topic, value: Some(Value(String("baz"))) }] }, include_metadata: [], format: Some(Bare(Protobuf(Csr { csr_connection: CsrConnectionProtobuf { connection: CsrConnection { connection: Name(UnresolvedItemName([Ident("csr_conn")])), options: [] }, seed: Some(CsrSeedProtobuf { key: None, value: CsrSeedProtobufSchema { schema: "{\"some\": \"seed\"}", message_name: "Batch" } }) } }))), envelope: Some(Debezium), if_not_exists: false, key_constraint: None, with_options: [], external_references: None, progress_subsource: None })
CreateSource(CreateSourceStatement { name: UnresolvedItemName([Ident("src1")]), in_cluster: None, col_names: [], connection: Kafka { connection: Name(UnresolvedItemName([Ident("conn1")])), options: [KafkaSourceConfigOption { name: Topic, value: Some(Value(String("baz"))) }] }, include_metadata: [], format: Some(Bare(Protobuf(Csr { csr_connection: CsrConnectionProtobuf { connection: CsrConnection { connection: Name(UnresolvedItemName([Ident("csr_conn")])), options: [] }, seed: Some(CsrSeedProtobuf { key: None, value: CsrSeedProtobufSchema { schema: "{\"some\": \"seed\"}", message_name: "Batch" } }) } }))), envelope: Some(Debezium { mode: None }), if_not_exists: false, key_constraint: None, with_options: [], external_references: None, progress_subsource: None })


parse-statement
CREATE SOURCE src1 FROM KAFKA CONNECTION conn1 (TOPIC 'baz') FORMAT PROTOBUF USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn SEED KEY SCHEMA '{"some": "seed"}' MESSAGE 'Batch' VALUE SCHEMA '123' MESSAGE 'M' ENVELOPE DEBEZIUM
----
CREATE SOURCE src1 FROM KAFKA CONNECTION conn1 (TOPIC = 'baz') FORMAT PROTOBUF USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn SEED KEY SCHEMA '{"some": "seed"}' MESSAGE 'Batch' VALUE SCHEMA '123' MESSAGE 'M' ENVELOPE DEBEZIUM
=>
CreateSource(CreateSourceStatement { name: UnresolvedItemName([Ident("src1")]), in_cluster: None, col_names: [], connection: Kafka { connection: Name(UnresolvedItemName([Ident("conn1")])), options: [KafkaSourceConfigOption { name: Topic, value: Some(Value(String("baz"))) }] }, include_metadata: [], format: Some(Bare(Protobuf(Csr { csr_connection: CsrConnectionProtobuf { connection: CsrConnection { connection: Name(UnresolvedItemName([Ident("csr_conn")])), options: [] }, seed: Some(CsrSeedProtobuf { key: Some(CsrSeedProtobufSchema { schema: "{\"some\": \"seed\"}", message_name: "Batch" }), value: CsrSeedProtobufSchema { schema: "123", message_name: "M" } }) } }))), envelope: Some(Debezium), if_not_exists: false, key_constraint: None, with_options: [], external_references: None, progress_subsource: None })
CreateSource(CreateSourceStatement { name: UnresolvedItemName([Ident("src1")]), in_cluster: None, col_names: [], connection: Kafka { connection: Name(UnresolvedItemName([Ident("conn1")])), options: [KafkaSourceConfigOption { name: Topic, value: Some(Value(String("baz"))) }] }, include_metadata: [], format: Some(Bare(Protobuf(Csr { csr_connection: CsrConnectionProtobuf { connection: CsrConnection { connection: Name(UnresolvedItemName([Ident("csr_conn")])), options: [] }, seed: Some(CsrSeedProtobuf { key: Some(CsrSeedProtobufSchema { schema: "{\"some\": \"seed\"}", message_name: "Batch" }), value: CsrSeedProtobufSchema { schema: "123", message_name: "M" } }) } }))), envelope: Some(Debezium { mode: None }), if_not_exists: false, key_constraint: None, with_options: [], external_references: None, progress_subsource: None })

parse-statement
CREATE SOURCE src1 FROM KAFKA CONNECTION conn1 (TOPIC 'baz') FORMAT PROTOBUF USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn SEED KEY SCHEMA '{"some": "seed"}' MESSAGE 'Batch' VALUE SCHEMA '123' MESSAGE 'M' ENVELOPE UPSERT
Expand Down
Loading
Loading