From d2b03f3b002b1dc6ddcf278cdcd0ee831595be1d Mon Sep 17 00:00:00 2001 From: PlugaruT Date: Fri, 6 Mar 2026 23:39:24 +0200 Subject: [PATCH 1/2] fix(kafka): headers don't need encoding as per kafka binding spec Signed-off-by: PlugaruT --- src/cloudevents/core/bindings/kafka.py | 19 +++++++++++---- tests/test_core/test_bindings/test_kafka.py | 26 +++++++++------------ 2 files changed, 26 insertions(+), 19 deletions(-) diff --git a/src/cloudevents/core/bindings/kafka.py b/src/cloudevents/core/bindings/kafka.py index 1cb5b48..bce5d3e 100644 --- a/src/cloudevents/core/bindings/kafka.py +++ b/src/cloudevents/core/bindings/kafka.py @@ -13,14 +13,16 @@ # under the License. from dataclasses import dataclass +from datetime import datetime from typing import Any, Callable, Final +from dateutil.parser import isoparse + from cloudevents.core.base import BaseCloudEvent, EventFactory from cloudevents.core.bindings.common import ( CONTENT_TYPE_HEADER, DATACONTENTTYPE_ATTR, - decode_header_value, - encode_header_value, + TIME_ATTR, get_event_factory_for_version, ) from cloudevents.core.formats.base import Format @@ -116,7 +118,13 @@ def to_binary( headers[CONTENT_TYPE_HEADER] = str(attr_value).encode("utf-8") else: header_name = f"{CE_PREFIX}{attr_name}" - headers[header_name] = encode_header_value(attr_value).encode("utf-8") + if isinstance(attr_value, datetime): + s = attr_value.isoformat() + if s.endswith("+00:00"): + s = s[:-6] + "Z" + headers[header_name] = s.encode("utf-8") + else: + headers[header_name] = str(attr_value).encode("utf-8") data = event.get_data() datacontenttype = attributes.get(DATACONTENTTYPE_ATTR) @@ -167,7 +175,10 @@ def from_binary( if normalized_name.startswith(CE_PREFIX): attr_name = normalized_name[len(CE_PREFIX) :] - attributes[attr_name] = decode_header_value(attr_name, header_value) + if attr_name == TIME_ATTR: + attributes[attr_name] = isoparse(header_value) + else: + attributes[attr_name] = header_value elif normalized_name == CONTENT_TYPE_HEADER: attributes[DATACONTENTTYPE_ATTR] = header_value diff --git a/tests/test_core/test_bindings/test_kafka.py b/tests/test_core/test_bindings/test_kafka.py index 3d33a29..9b1d6d6 100644 --- a/tests/test_core/test_bindings/test_kafka.py +++ b/tests/test_core/test_bindings/test_kafka.py @@ -93,9 +93,7 @@ def test_to_binary_required_attributes() -> None: assert "ce_type" in message.headers assert message.headers["ce_type"] == b"com.example.test" assert "ce_source" in message.headers - assert ( - message.headers["ce_source"] == b"%2Ftest" - ) # Forward slash is percent-encoded + assert message.headers["ce_source"] == b"/test" assert "ce_id" in message.headers assert message.headers["ce_id"] == b"test-id-123" assert "ce_specversion" in message.headers @@ -111,8 +109,7 @@ def test_to_binary_with_optional_attributes() -> None: message = to_binary(event, JSONFormat()) assert message.headers["ce_subject"] == b"test-subject" - # All special characters including : and / are percent-encoded - assert message.headers["ce_dataschema"] == b"https%3A%2F%2Fexample.com%2Fschema" + assert message.headers["ce_dataschema"] == b"https://example.com/schema" def test_to_binary_with_extensions() -> None: @@ -171,8 +168,7 @@ def test_to_binary_datetime_encoding() -> None: message = to_binary(event, JSONFormat()) assert "ce_time" in message.headers - # Should be ISO 8601 with Z suffix, percent-encoded - assert b"2023-01-15T10%3A30%3A45Z" in message.headers["ce_time"] + assert message.headers["ce_time"] == b"2023-01-15T10:30:45Z" def test_to_binary_special_characters() -> None: @@ -181,7 +177,7 @@ def test_to_binary_special_characters() -> None: message = to_binary(event, JSONFormat()) assert "ce_subject" in message.headers - assert b"%" in message.headers["ce_subject"] # Percent encoding present + assert message.headers["ce_subject"] == b'Hello World! "quotes" & special' def test_to_binary_datacontenttype_mapping() -> None: @@ -228,7 +224,7 @@ def test_from_binary_required_attributes() -> None: message = KafkaMessage( headers={ "ce_type": b"com.example.test", - "ce_source": b"%2Ftest", + "ce_source": b"/test", "ce_id": b"test-123", "ce_specversion": b"1.0", }, @@ -238,7 +234,7 @@ def test_from_binary_required_attributes() -> None: event = from_binary(message, JSONFormat(), CloudEvent) assert event.get_type() == "com.example.test" - assert event.get_source() == "/test" # Percent-decoded + assert event.get_source() == "/test" assert event.get_id() == "test-123" assert event.get_specversion() == "1.0" @@ -252,7 +248,7 @@ def test_from_binary_with_optional_attributes() -> None: "ce_id": b"123", "ce_specversion": b"1.0", "ce_subject": b"test-subject", - "ce_dataschema": b"https%3A%2F%2Fexample.com%2Fschema", + "ce_dataschema": b"https://example.com/schema", }, key=None, value=b"", @@ -260,7 +256,7 @@ def test_from_binary_with_optional_attributes() -> None: event = from_binary(message, JSONFormat(), CloudEvent) assert event.get_subject() == "test-subject" - assert event.get_dataschema() == "https://example.com/schema" # Percent-decoded + assert event.get_dataschema() == "https://example.com/schema" def test_from_binary_with_extensions() -> None: @@ -310,7 +306,7 @@ def test_from_binary_datetime_parsing() -> None: "ce_source": b"/test", "ce_id": b"123", "ce_specversion": b"1.0", - "ce_time": b"2023-01-15T10%3A30%3A45Z", + "ce_time": b"2023-01-15T10:30:45Z", }, key=None, value=b"", @@ -658,7 +654,7 @@ def test_from_binary_with_defaults() -> None: message = KafkaMessage( headers={ "ce_type": b"com.example.test", - "ce_source": b"%2Ftest", + "ce_source": b"/test", "ce_id": b"123", "ce_specversion": b"1.0", "content-type": b"application/json", @@ -700,7 +696,7 @@ def test_from_kafka_with_defaults_binary() -> None: message = KafkaMessage( headers={ "ce_type": b"com.example.test", - "ce_source": b"%2Ftest", + "ce_source": b"/test", "ce_id": b"123", "ce_specversion": b"1.0", }, From b0bbdb0eaf41bb55d8d750b0e3a6e4b33e474d80 Mon Sep 17 00:00:00 2001 From: PlugaruT Date: Fri, 6 Mar 2026 23:46:05 +0200 Subject: [PATCH 2/2] fix(kafka): partitionkey shall still be passed in kafka headers in binary mode. Signed-off-by: PlugaruT --- src/cloudevents/core/bindings/kafka.py | 4 ---- tests/test_core/test_bindings/test_kafka.py | 4 ++-- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/src/cloudevents/core/bindings/kafka.py b/src/cloudevents/core/bindings/kafka.py index bce5d3e..dae9e49 100644 --- a/src/cloudevents/core/bindings/kafka.py +++ b/src/cloudevents/core/bindings/kafka.py @@ -110,10 +110,6 @@ def to_binary( if attr_value is None: continue - # Skip partitionkey - it goes in the message key, not headers - if attr_name == PARTITIONKEY_ATTR: - continue - if attr_name == DATACONTENTTYPE_ATTR: headers[CONTENT_TYPE_HEADER] = str(attr_value).encode("utf-8") else: diff --git a/tests/test_core/test_bindings/test_kafka.py b/tests/test_core/test_bindings/test_kafka.py index 9b1d6d6..54df822 100644 --- a/tests/test_core/test_bindings/test_kafka.py +++ b/tests/test_core/test_bindings/test_kafka.py @@ -191,12 +191,12 @@ def test_to_binary_datacontenttype_mapping() -> None: def test_to_binary_partitionkey_in_key() -> None: - """Test that partitionkey extension attribute becomes message key""" + """Test that partitionkey becomes message key and is still included in headers""" event = create_event({"partitionkey": "user-123"}) message = to_binary(event, JSONFormat()) assert message.key == "user-123" - assert "ce_partitionkey" not in message.headers + assert message.headers["ce_partitionkey"] == b"user-123" def test_to_binary_custom_key_mapper() -> None: