From 1dc0689aadaea60f5ac512ac35e97ddb20cb44b4 Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Wed, 15 Apr 2026 10:29:45 +0200 Subject: [PATCH 01/14] Support Kafka Streams built-in dead letter queue in clean up --- gradle/libs.versions.toml | 2 +- .../kafka/streams/StreamsCleanUpRunner.java | 4 +-- .../streams/StreamsCleanUpRunnerTest.java | 26 ++++++++++++++++++- 3 files changed, 28 insertions(+), 4 deletions(-) diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 8443d00c1..313d528cd 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -2,7 +2,7 @@ junit = "6.0.3" mockito = "5.23.0" testcontainers = "2.0.4" -kafkaUtils = "1.4.0" +kafkaUtils = "1.4.1-SNAPSHOT" [libraries] kafka-bom = { group = "com.bakdata.kafka", name = "kafka-bom", version.ref = "kafkaUtils" } diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/StreamsCleanUpRunner.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/StreamsCleanUpRunner.java index cd9a914a9..ce832248f 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/StreamsCleanUpRunner.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/StreamsCleanUpRunner.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2025 bakdata + * Copyright (c) 2026 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -84,7 +84,7 @@ public static StreamsCleanUpRunner create(final @NonNull Topology topology, public static StreamsCleanUpRunner create(final @NonNull Topology topology, final @NonNull StreamsConfig streamsConfig, final @NonNull StreamsCleanUpConfiguration configuration) { final StreamsConfigX config = new StreamsConfigX(streamsConfig); - final TopologyInformation topologyInformation = new TopologyInformation(topology, config.getAppId()); + final TopologyInformation topologyInformation = new TopologyInformation(topology, streamsConfig); SchemaRegistryAppUtils.createTopicHook(config.getKafkaProperties()) .ifPresent(configuration::registerTopicHook); return new StreamsCleanUpRunner(topologyInformation, topology, config, configuration); diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/streams/StreamsCleanUpRunnerTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/streams/StreamsCleanUpRunnerTest.java index cb6ba506c..268c5c618 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/streams/StreamsCleanUpRunnerTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/streams/StreamsCleanUpRunnerTest.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2025 bakdata + * Copyright (c) 2026 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -56,6 +56,7 @@ import java.io.IOException; import java.nio.file.Path; import java.util.List; +import java.util.Map; import java.util.regex.Pattern; import java.util.stream.Stream; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -63,6 +64,7 @@ import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsConfig; import org.assertj.core.api.SoftAssertions; import org.assertj.core.api.junit.jupiter.InjectSoftAssertions; import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension; @@ -163,6 +165,28 @@ void shouldDeleteTopic() { } } + @Test + void shouldDeleteDlqTopic() { + final String dlqTopic = "dlq"; + try (final ConfiguredStreamsApp app = createWordCountApplication(); + final ExecutableStreamsApp executableApp = this.createExecutableApp(app, + this.createConfig() + .with(Map.of( + StreamsConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG, dlqTopic + )))) { + final KafkaTestClient testClient = this.newTestClient(); + testClient.createTopic(dlqTopic); + clean(executableApp); + + try (final AdminClientX admin = testClient.admin()) { + final TopicsClient topics = admin.topics(); + this.softly.assertThat(topics.topic(dlqTopic).exists()) + .as("Dead letter queue topic is deleted") + .isFalse(); + } + } + } + @Test void shouldDeleteConsumerGroup() { try (final ConfiguredStreamsApp app = createWordCountApplication(); From 97502428a045beaff5794c1d702f53fcb39e1dbc Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Wed, 15 Apr 2026 10:57:19 +0200 Subject: [PATCH 02/14] Support Kafka Streams built-in dead letter queue in clean up --- gradle/libs.versions.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 313d528cd..43005f2ba 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -33,7 +33,7 @@ mockito-core = { group = "org.mockito", name = "mockito-core", version.ref = "mo mockito-junit = { group = "org.mockito", name = "mockito-junit-jupiter", version.ref = "mockito" } testcontainers-junit = { group = "org.testcontainers", name = "testcontainers-junit-jupiter", version.ref = "testcontainers" } testcontainers-kafka = { group = "org.testcontainers", name = "testcontainers-kafka", version.ref = "testcontainers" } -fluentKafkaStreamsTests = { group = "com.bakdata.fluent-kafka-streams-tests", name = "fluent-kafka-streams-tests-junit5", version = "3.5.1" } +fluentKafkaStreamsTests = { group = "com.bakdata.fluent-kafka-streams-tests", name = "fluent-kafka-streams-tests-junit5", version = "3.5.2-SNAPSHOT" } log4j-slf4j2 = { group = "org.apache.logging.log4j", name = "log4j-slf4j2-impl", version = "2.25.4" } awaitility = { group = "org.awaitility", name = "awaitility", version = "4.3.0" } From 62f736300d8a2eab3b4aa10fccc65b46ee93ae33 Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Wed, 15 Apr 2026 14:49:21 +0200 Subject: [PATCH 03/14] Support Kafka Streams built-in dead letter queue in clean up --- gradle/libs.versions.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 43005f2ba..be7ba99de 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -15,7 +15,7 @@ kafka-schema-serializer = { group = "io.confluent", name = "kafka-schema-seriali kafka-schema-registry-client = { group = "io.confluent", name = "kafka-schema-registry-client" } kafka-streams-avro-serde = { group = "io.confluent", name = "kafka-streams-avro-serde" } kafka-protobuf-provider = { group = "io.confluent", name = "kafka-protobuf-provider" } -largeMessage-bom = { group = "com.bakdata.kafka", name = "large-message-bom", version = "3.1.1" } +largeMessage-bom = { group = "com.bakdata.kafka", name = "large-message-bom", version = "3.1.2-SNAPSHOT" } largeMessage-core = { group = "com.bakdata.kafka", name = "large-message-core" } errorHandling-bom = { group = "com.bakdata.kafka", name = "error-handling-bom", version = "2.1.2" } errorHandling-core = { group = "com.bakdata.kafka", name = "error-handling-core" } From 64a9a4eaf1a50374d400bca6f81ccfbc9aee8c28 Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Wed, 15 Apr 2026 16:41:12 +0200 Subject: [PATCH 04/14] Support Kafka Streams built-in dead letter queue in clean up --- gradle/libs.versions.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index be7ba99de..49d0bca1a 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -17,7 +17,7 @@ kafka-streams-avro-serde = { group = "io.confluent", name = "kafka-streams-avro- kafka-protobuf-provider = { group = "io.confluent", name = "kafka-protobuf-provider" } largeMessage-bom = { group = "com.bakdata.kafka", name = "large-message-bom", version = "3.1.2-SNAPSHOT" } largeMessage-core = { group = "com.bakdata.kafka", name = "large-message-core" } -errorHandling-bom = { group = "com.bakdata.kafka", name = "error-handling-bom", version = "2.1.2" } +errorHandling-bom = { group = "com.bakdata.kafka", name = "error-handling-bom", version = "2.1.3-SNAPSHOT" } errorHandling-core = { group = "com.bakdata.kafka", name = "error-handling-core" } picocli = { group = "info.picocli", name = "picocli", version = "4.7.7" } slf4j = { group = "org.slf4j", name = "slf4j-api", version = "2.0.17" } From 86ad7928c729e9e30b968f90cc3ff5ef5def09ad Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Thu, 16 Apr 2026 10:12:18 +0200 Subject: [PATCH 05/14] Update --- gradle/libs.versions.toml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 49d0bca1a..0984497f1 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -2,7 +2,7 @@ junit = "6.0.3" mockito = "5.23.0" testcontainers = "2.0.4" -kafkaUtils = "1.4.1-SNAPSHOT" +kafkaUtils = "1.5.0" [libraries] kafka-bom = { group = "com.bakdata.kafka", name = "kafka-bom", version.ref = "kafkaUtils" } @@ -15,9 +15,9 @@ kafka-schema-serializer = { group = "io.confluent", name = "kafka-schema-seriali kafka-schema-registry-client = { group = "io.confluent", name = "kafka-schema-registry-client" } kafka-streams-avro-serde = { group = "io.confluent", name = "kafka-streams-avro-serde" } kafka-protobuf-provider = { group = "io.confluent", name = "kafka-protobuf-provider" } -largeMessage-bom = { group = "com.bakdata.kafka", name = "large-message-bom", version = "3.1.2-SNAPSHOT" } +largeMessage-bom = { group = "com.bakdata.kafka", name = "large-message-bom", version = "3.1.1" } largeMessage-core = { group = "com.bakdata.kafka", name = "large-message-core" } -errorHandling-bom = { group = "com.bakdata.kafka", name = "error-handling-bom", version = "2.1.3-SNAPSHOT" } +errorHandling-bom = { group = "com.bakdata.kafka", name = "error-handling-bom", version = "2.1.2" } errorHandling-core = { group = "com.bakdata.kafka", name = "error-handling-core" } picocli = { group = "info.picocli", name = "picocli", version = "4.7.7" } slf4j = { group = "org.slf4j", name = "slf4j-api", version = "2.0.17" } From b3d8bd4f8e500b9e4718a187c0b1799b15a2daa8 Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Thu, 16 Apr 2026 11:06:27 +0200 Subject: [PATCH 06/14] Configure built-in dead letter queue --- gradle/libs.versions.toml | 4 +- .../kafka/streams/ConfiguredStreamsApp.java | 77 ++++++++++++------- 2 files changed, 50 insertions(+), 31 deletions(-) diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 0984497f1..082bdae62 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -15,9 +15,9 @@ kafka-schema-serializer = { group = "io.confluent", name = "kafka-schema-seriali kafka-schema-registry-client = { group = "io.confluent", name = "kafka-schema-registry-client" } kafka-streams-avro-serde = { group = "io.confluent", name = "kafka-streams-avro-serde" } kafka-protobuf-provider = { group = "io.confluent", name = "kafka-protobuf-provider" } -largeMessage-bom = { group = "com.bakdata.kafka", name = "large-message-bom", version = "3.1.1" } +largeMessage-bom = { group = "com.bakdata.kafka", name = "large-message-bom", version = "3.1.2-SNAPSHOT" } largeMessage-core = { group = "com.bakdata.kafka", name = "large-message-core" } -errorHandling-bom = { group = "com.bakdata.kafka", name = "error-handling-bom", version = "2.1.2" } +errorHandling-bom = { group = "com.bakdata.kafka", name = "error-handling-bom", version = "2.1.3-SNAPSHOT" } errorHandling-core = { group = "com.bakdata.kafka", name = "error-handling-core" } picocli = { group = "info.picocli", name = "picocli", version = "4.7.7" } slf4j = { group = "org.slf4j", name = "slf4j-api", version = "2.0.17" } diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/ConfiguredStreamsApp.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/ConfiguredStreamsApp.java index 5fb01ddb2..f8f303005 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/ConfiguredStreamsApp.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/ConfiguredStreamsApp.java @@ -25,6 +25,7 @@ package com.bakdata.kafka.streams; import com.bakdata.kafka.ConfiguredApp; +import com.bakdata.kafka.DescribingProcessingExceptionHandler; import com.bakdata.kafka.EnvironmentKafkaConfigParser; import com.bakdata.kafka.KafkaPropertiesFactory; import com.bakdata.kafka.RuntimeConfiguration; @@ -42,6 +43,7 @@ /** * A {@link StreamsApp} with a corresponding {@link StreamsAppConfiguration} + * * @param type of {@link StreamsApp} */ @RequiredArgsConstructor @@ -50,24 +52,6 @@ public class ConfiguredStreamsApp implements ConfiguredApp private final @NonNull T app; private final @NonNull StreamsAppConfiguration configuration; - private static Map createBaseConfig() { - final Map kafkaConfig = new HashMap<>(); - - // exactly once and order - kafkaConfig.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2); - kafkaConfig.put(StreamsConfig.producerPrefix(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION), 1); - - kafkaConfig.put(StreamsConfig.producerPrefix(ProducerConfig.ACKS_CONFIG), "all"); - - // compression - kafkaConfig.put(StreamsConfig.producerPrefix(ProducerConfig.COMPRESSION_TYPE_CONFIG), - CompressionType.GZIP.toString()); - - kafkaConfig.put(StreamsConfig.ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG, true); - - return kafkaConfig; - } - /** *

This method creates the configuration to run a {@link StreamsApp}.

* Configuration is created in the following order @@ -82,6 +66,13 @@ private static Map createBaseConfig() { * * *
  • + * Dead letter queue is configured if an error topic is set: + *
    +     * errors.dead.letter.queue.topic.name={@link StreamsTopicConfig#getErrorTopic()}
    +     * processing.exception.handler={@link DescribingProcessingExceptionHandler}
    +     * 
    + *
  • + *
  • * Configs provided by {@link StreamsApp#createKafkaProperties()} *
  • *
  • @@ -114,10 +105,11 @@ public Map getKafkaProperties(final RuntimeConfiguration runtime /** * Get unique application identifier of {@link StreamsApp} + * * @return unique application identifier - * @see StreamsApp#getUniqueAppId(StreamsAppConfiguration) * @throws IllegalArgumentException if unique application identifier of {@link StreamsApp} is different from * provided application identifier in {@link StreamsAppConfiguration} + * @see StreamsApp#getUniqueAppId(StreamsAppConfiguration) */ public String getUniqueAppId() { final String uniqueAppId = @@ -128,17 +120,9 @@ public String getUniqueAppId() { return uniqueAppId; } - /** - * Get topic configuration - * - * @return topic configuration - */ - public StreamsTopicConfig getTopics() { - return this.configuration.getTopics(); - } - /** * Create an {@link ExecutableStreamsApp} using the provided {@link RuntimeConfiguration} + * * @return {@link ExecutableStreamsApp} */ @Override @@ -153,6 +137,41 @@ public ExecutableStreamsApp withRuntimeConfiguration(final RuntimeConfigurati .build(); } + /** + * Get topic configuration + * + * @return topic configuration + */ + public StreamsTopicConfig getTopics() { + return this.configuration.getTopics(); + } + + private Map createBaseConfig() { + final Map kafkaConfig = new HashMap<>(); + + // exactly once and order + kafkaConfig.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2); + kafkaConfig.put(StreamsConfig.producerPrefix(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION), 1); + + kafkaConfig.put(StreamsConfig.producerPrefix(ProducerConfig.ACKS_CONFIG), "all"); + + // compression + kafkaConfig.put(StreamsConfig.producerPrefix(ProducerConfig.COMPRESSION_TYPE_CONFIG), + CompressionType.GZIP.toString()); + + kafkaConfig.put(StreamsConfig.ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG, true); + + // built-in dead-letter queue + final String errorTopic = this.getTopics().getErrorTopic(); + if (errorTopic != null) { + kafkaConfig.put(StreamsConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG, errorTopic); + kafkaConfig.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, + DescribingProcessingExceptionHandler.class); + } + + return kafkaConfig; + } + /** * Create the topology of the Kafka Streams app * @@ -171,7 +190,7 @@ public void close() { } private KafkaPropertiesFactory createPropertiesFactory(final RuntimeConfiguration runtimeConfig) { - final Map baseConfig = createBaseConfig(); + final Map baseConfig = this.createBaseConfig(); return KafkaPropertiesFactory.builder() .baseConfig(baseConfig) .app(this.app) From 3427189841d78fe8f23c10c78a39fbb7a267a8aa Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Thu, 16 Apr 2026 11:06:52 +0200 Subject: [PATCH 07/14] Use stable --- gradle/libs.versions.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 0984497f1..71afcc008 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -33,7 +33,7 @@ mockito-core = { group = "org.mockito", name = "mockito-core", version.ref = "mo mockito-junit = { group = "org.mockito", name = "mockito-junit-jupiter", version.ref = "mockito" } testcontainers-junit = { group = "org.testcontainers", name = "testcontainers-junit-jupiter", version.ref = "testcontainers" } testcontainers-kafka = { group = "org.testcontainers", name = "testcontainers-kafka", version.ref = "testcontainers" } -fluentKafkaStreamsTests = { group = "com.bakdata.fluent-kafka-streams-tests", name = "fluent-kafka-streams-tests-junit5", version = "3.5.2-SNAPSHOT" } +fluentKafkaStreamsTests = { group = "com.bakdata.fluent-kafka-streams-tests", name = "fluent-kafka-streams-tests-junit5", version = "3.6.0" } log4j-slf4j2 = { group = "org.apache.logging.log4j", name = "log4j-slf4j2-impl", version = "2.25.4" } awaitility = { group = "org.awaitility", name = "awaitility", version = "4.3.0" } From 6e2b7edd7e76e1696148603996a97eec65877432 Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Thu, 16 Apr 2026 11:16:10 +0200 Subject: [PATCH 08/14] Reorder --- .../kafka/streams/ConfiguredStreamsApp.java | 34 +++++++++---------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/ConfiguredStreamsApp.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/ConfiguredStreamsApp.java index f8f303005..317b1095b 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/ConfiguredStreamsApp.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/ConfiguredStreamsApp.java @@ -146,6 +146,23 @@ public StreamsTopicConfig getTopics() { return this.configuration.getTopics(); } + /** + * Create the topology of the Kafka Streams app + * + * @param kafkaProperties configuration that should be used by clients to configure Kafka utilities + * @return topology of the Kafka Streams app + */ + public Topology createTopology(final Map kafkaProperties) { + final StreamsBuilderX streamsBuilder = new StreamsBuilderX(this.getTopics(), kafkaProperties); + this.app.buildTopology(streamsBuilder); + return streamsBuilder.build(); + } + + @Override + public void close() { + this.app.close(); + } + private Map createBaseConfig() { final Map kafkaConfig = new HashMap<>(); @@ -172,23 +189,6 @@ private Map createBaseConfig() { return kafkaConfig; } - /** - * Create the topology of the Kafka Streams app - * - * @param kafkaProperties configuration that should be used by clients to configure Kafka utilities - * @return topology of the Kafka Streams app - */ - public Topology createTopology(final Map kafkaProperties) { - final StreamsBuilderX streamsBuilder = new StreamsBuilderX(this.getTopics(), kafkaProperties); - this.app.buildTopology(streamsBuilder); - return streamsBuilder.build(); - } - - @Override - public void close() { - this.app.close(); - } - private KafkaPropertiesFactory createPropertiesFactory(final RuntimeConfiguration runtimeConfig) { final Map baseConfig = this.createBaseConfig(); return KafkaPropertiesFactory.builder() From 6c53ef8bc30768edb55dff659ef6aa02a29c2cc1 Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Thu, 16 Apr 2026 21:34:42 +0200 Subject: [PATCH 09/14] Add verification hook --- .../com/bakdata/kafka/KafkaApplication.java | 38 ++++++++++++------- .../KafkaStreamsApplicationRunTest.java | 37 +++++++++++++++++- 2 files changed, 61 insertions(+), 14 deletions(-) diff --git a/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/KafkaApplication.java b/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/KafkaApplication.java index d6d15290c..1dc530243 100644 --- a/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/KafkaApplication.java +++ b/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/KafkaApplication.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2025 bakdata + * Copyright (c) 2026 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -88,6 +88,18 @@ public abstract class KafkaApplication kafkaConfig = emptyMap(); + private static String[] addEnvironmentVariablesArguments(final String[] args) { + if (ENV_PREFIX.equals(EnvironmentKafkaConfigParser.PREFIX)) { + throw new IllegalArgumentException( + String.format("Prefix '%s' is reserved for Kafka config", EnvironmentKafkaConfigParser.PREFIX)); + } + final List environmentArguments = new EnvironmentArgumentsParser(ENV_PREFIX) + .parseVariables(System.getenv()); + final Collection allArgs = new ArrayList<>(environmentArguments); + allArgs.addAll(Arrays.asList(args)); + return allArgs.toArray(String[]::new); + } + /** *

    This method should be called in the main method of your application

    *

    This method calls System exit

    @@ -113,18 +125,6 @@ public int startApplicationWithoutExit(final String[] args) { return commandLine.execute(populatedArgs); } - private static String[] addEnvironmentVariablesArguments(final String[] args) { - if (ENV_PREFIX.equals(EnvironmentKafkaConfigParser.PREFIX)) { - throw new IllegalArgumentException( - String.format("Prefix '%s' is reserved for Kafka config", EnvironmentKafkaConfigParser.PREFIX)); - } - final List environmentArguments = new EnvironmentArgumentsParser(ENV_PREFIX) - .parseVariables(System.getenv()); - final Collection allArgs = new ArrayList<>(environmentArguments); - allArgs.addAll(Arrays.asList(args)); - return allArgs.toArray(String[]::new); - } - /** * Create options for running the app * @@ -212,6 +212,7 @@ public final CA createConfiguredApp() { final T topics = this.createTopicConfig(); final A app = this.createApp(); final AC appConfiguration = this.createConfiguration(topics); + this.verify(app, appConfiguration); return this.createConfiguredApp(app, appConfiguration); } @@ -271,6 +272,17 @@ public void prepareClean() { // do nothing by default } + /** + * Hook to verify an app and its configuration. An exception should be thrown if the app or configuration is + * invalid. Does nothing by default. + * + * @param app app to verify + * @param configuration configuration to verify + */ + protected void verify(final A app, final AC configuration) { + // do nothing by default + } + /** * Create a new {@link ConfiguredApp} that will be executed according to the given config. * diff --git a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/streams/KafkaStreamsApplicationRunTest.java b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/streams/KafkaStreamsApplicationRunTest.java index de520dbd4..96ebc72a9 100644 --- a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/streams/KafkaStreamsApplicationRunTest.java +++ b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/streams/KafkaStreamsApplicationRunTest.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2025 bakdata + * Copyright (c) 2026 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -25,6 +25,7 @@ package com.bakdata.kafka.streams; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import com.bakdata.kafka.KafkaTest; import com.bakdata.kafka.KafkaTestClient; @@ -33,6 +34,9 @@ import com.bakdata.kafka.streams.apps.Mirror; import java.nio.file.Path; import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.junit.jupiter.api.Test; @@ -67,4 +71,35 @@ void shouldRunApp() { .hasSize(1); } } + + @Test + void shouldVerifyApp() { + final String input = "input"; + final String output = "output"; + final RuntimeException exception = new RuntimeException("Invalid app"); + try (final KafkaStreamsApplication app = new KafkaStreamsApplication<>() { + @Override + public StreamsApp createApp() { + return new Mirror(); + } + + @Override + protected void verify(final StreamsApp app, final StreamsAppConfiguration configuration) { + throw exception; + } + }) { + app.setInputTopics(List.of(input)); + app.setOutputTopic(output); + final TestApplicationRunner runner = TestApplicationRunner.create(this.getBootstrapServers()) + .withStateDir(this.stateDir) + .withNoStateStoreCaching() + .withSessionTimeout(SESSION_TIMEOUT); + final KafkaTestClient testClient = runner.newTestClient(); + testClient.createTopic(output); + final CompletableFuture future = runner.run(app); + assertThatThrownBy(() -> future.get(10, TimeUnit.SECONDS)) + .isInstanceOf(ExecutionException.class) + .hasCause(exception); + } + } } From c7b0f4610d7a8476a4b42662d427def6f014c935 Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Thu, 16 Apr 2026 21:44:32 +0200 Subject: [PATCH 10/14] Add verification hook --- .../java/com/bakdata/kafka/KafkaApplication.java | 15 +++++++-------- .../streams/KafkaStreamsApplicationRunTest.java | 2 +- 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/KafkaApplication.java b/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/KafkaApplication.java index 1dc530243..024b8a8e9 100644 --- a/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/KafkaApplication.java +++ b/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/KafkaApplication.java @@ -198,9 +198,10 @@ public RuntimeConfiguration getRuntimeConfiguration() { * @return {@link ExecutableApp} */ public final E createExecutableApp() { - final ConfiguredApp configuredStreamsApp = this.createConfiguredApp(); + final ConfiguredApp configuredApp = this.createConfiguredApp(); final RuntimeConfiguration runtimeConfiguration = this.getRuntimeConfiguration(); - return configuredStreamsApp.withRuntimeConfiguration(runtimeConfiguration); + final E executableApp = configuredApp.withRuntimeConfiguration(runtimeConfiguration); + return executableApp; } /** @@ -211,8 +212,8 @@ public final E createExecutableApp() { public final CA createConfiguredApp() { final T topics = this.createTopicConfig(); final A app = this.createApp(); + this.verify(app); final AC appConfiguration = this.createConfiguration(topics); - this.verify(app, appConfiguration); return this.createConfiguredApp(app, appConfiguration); } @@ -230,7 +231,7 @@ public final CA createConfiguredApp() { * @return {@link RunnableApp} */ public final RunnableApp createRunnableApp() { - final ExecutableApp app = this.createExecutableApp(); + final E app = this.createExecutableApp(); final Optional executionOptions = this.createExecutionOptions(); final R runner = executionOptions.map(app::createRunner).orElseGet(app::createRunner); final RunnableApp runnableApp = new RunnableApp<>(app, runner, this.activeApps::remove); @@ -273,13 +274,11 @@ public void prepareClean() { } /** - * Hook to verify an app and its configuration. An exception should be thrown if the app or configuration is - * invalid. Does nothing by default. + * Hook to verify an app. An exception should be thrown if the app is invalid. Does nothing by default. * * @param app app to verify - * @param configuration configuration to verify */ - protected void verify(final A app, final AC configuration) { + protected void verify(final A app) { // do nothing by default } diff --git a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/streams/KafkaStreamsApplicationRunTest.java b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/streams/KafkaStreamsApplicationRunTest.java index 96ebc72a9..f9203c0b7 100644 --- a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/streams/KafkaStreamsApplicationRunTest.java +++ b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/streams/KafkaStreamsApplicationRunTest.java @@ -84,7 +84,7 @@ public StreamsApp createApp() { } @Override - protected void verify(final StreamsApp app, final StreamsAppConfiguration configuration) { + protected void verify(final StreamsApp app) { throw exception; } }) { From a3c83a938298f4c74f7ce0d8a1b3d153ef8a3bad Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Thu, 16 Apr 2026 21:45:27 +0200 Subject: [PATCH 11/14] Add verification hook --- .../src/main/java/com/bakdata/kafka/KafkaApplication.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/KafkaApplication.java b/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/KafkaApplication.java index 024b8a8e9..1fa88e7d3 100644 --- a/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/KafkaApplication.java +++ b/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/KafkaApplication.java @@ -200,8 +200,7 @@ public RuntimeConfiguration getRuntimeConfiguration() { public final E createExecutableApp() { final ConfiguredApp configuredApp = this.createConfiguredApp(); final RuntimeConfiguration runtimeConfiguration = this.getRuntimeConfiguration(); - final E executableApp = configuredApp.withRuntimeConfiguration(runtimeConfiguration); - return executableApp; + return configuredApp.withRuntimeConfiguration(runtimeConfiguration); } /** From 956f966d8ff1c5880dd0d92f4a3f2278f61562a0 Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Fri, 17 Apr 2026 10:18:42 +0200 Subject: [PATCH 12/14] Update --- .github/workflows/build-and-publish.yaml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.github/workflows/build-and-publish.yaml b/.github/workflows/build-and-publish.yaml index d8e2635a7..afd0febba 100644 --- a/.github/workflows/build-and-publish.yaml +++ b/.github/workflows/build-and-publish.yaml @@ -12,6 +12,8 @@ jobs: build-and-publish: name: Java Gradle uses: bakdata/ci-templates/.github/workflows/java-gradle-library.yaml@1.81.2 + with: + gradle-refresh-dependencies: true secrets: sonar-token: ${{ secrets.SONARCLOUD_TOKEN }} sonar-organization: ${{ secrets.SONARCLOUD_ORGANIZATION }} From c7e0dba8677bbde7f7ae4db663205f9c636f74f1 Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Tue, 28 Apr 2026 08:56:13 +0200 Subject: [PATCH 13/14] Stable --- gradle/libs.versions.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 33e943648..64ae254a8 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -17,7 +17,7 @@ kafka-streams-avro-serde = { group = "io.confluent", name = "kafka-streams-avro- kafka-protobuf-provider = { group = "io.confluent", name = "kafka-protobuf-provider" } largeMessage-bom = { group = "com.bakdata.kafka", name = "large-message-bom", version = "3.1.2-SNAPSHOT" } largeMessage-core = { group = "com.bakdata.kafka", name = "large-message-core" } -errorHandling-bom = { group = "com.bakdata.kafka", name = "error-handling-bom", version = "2.1.3-SNAPSHOT" } +errorHandling-bom = { group = "com.bakdata.kafka", name = "error-handling-bom", version = "2.2.0" } errorHandling-core = { group = "com.bakdata.kafka", name = "error-handling-core" } picocli = { group = "info.picocli", name = "picocli", version = "4.7.7" } slf4j = { group = "org.slf4j", name = "slf4j-api", version = "2.0.17" } From 99d7426c4d4775539e96fbe1457606e20d14554d Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Tue, 28 Apr 2026 09:18:25 +0200 Subject: [PATCH 14/14] Stable --- .../kafka/streams/ConfiguredStreamsApp.java | 6 ++--- .../kstream/ConfiguredStreamsAppTest.java | 23 +++++++++++++++++++ 2 files changed, 26 insertions(+), 3 deletions(-) diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/ConfiguredStreamsApp.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/ConfiguredStreamsApp.java index 317b1095b..39e1a8bc3 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/ConfiguredStreamsApp.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/ConfiguredStreamsApp.java @@ -25,8 +25,8 @@ package com.bakdata.kafka.streams; import com.bakdata.kafka.ConfiguredApp; -import com.bakdata.kafka.DescribingProcessingExceptionHandler; import com.bakdata.kafka.EnvironmentKafkaConfigParser; +import com.bakdata.kafka.FilteringProcessingExceptionHandler; import com.bakdata.kafka.KafkaPropertiesFactory; import com.bakdata.kafka.RuntimeConfiguration; import com.bakdata.kafka.streams.kstream.StreamsBuilderX; @@ -69,7 +69,7 @@ public class ConfiguredStreamsApp implements ConfiguredApp * Dead letter queue is configured if an error topic is set: *
          * errors.dead.letter.queue.topic.name={@link StreamsTopicConfig#getErrorTopic()}
    -     * processing.exception.handler={@link DescribingProcessingExceptionHandler}
    +     * processing.exception.handler={@link FilteringProcessingExceptionHandler}
          * 
    *
  • *
  • @@ -183,7 +183,7 @@ private Map createBaseConfig() { if (errorTopic != null) { kafkaConfig.put(StreamsConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG, errorTopic); kafkaConfig.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, - DescribingProcessingExceptionHandler.class); + FilteringProcessingExceptionHandler.class); } return kafkaConfig; diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/streams/kstream/ConfiguredStreamsAppTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/streams/kstream/ConfiguredStreamsAppTest.java index 18ae0159e..fa9bd5668 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/streams/kstream/ConfiguredStreamsAppTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/streams/kstream/ConfiguredStreamsAppTest.java @@ -27,9 +27,11 @@ import static java.util.Collections.emptyMap; import static org.apache.kafka.streams.StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG; import static org.apache.kafka.streams.StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG; +import static org.apache.kafka.streams.StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import com.bakdata.kafka.FilteringProcessingExceptionHandler; import com.bakdata.kafka.RuntimeConfiguration; import com.bakdata.kafka.streams.ConfiguredStreamsApp; import com.bakdata.kafka.streams.SerdeConfig; @@ -243,6 +245,27 @@ public void buildTopology(final StreamsBuilderX builder) { .hasMessageStartingWith("Invalid topology:"); } + @Test + void shouldSetProcessingExceptionHandler() { + final ConfiguredStreamsApp configuredApp = + new ConfiguredStreamsApp<>(new TestApplication(), + new StreamsAppConfiguration(StreamsTopicConfig.builder() + .errorTopic("error") + .build())); + assertThat(configuredApp.getKafkaProperties(RuntimeConfiguration.create("fake"))) + .containsEntry(PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, FilteringProcessingExceptionHandler.class); + } + + @Test + void shouldNotSetProcessingExceptionHandler() { + final ConfiguredStreamsApp configuredApp = + new ConfiguredStreamsApp<>(new TestApplication(), + new StreamsAppConfiguration(StreamsTopicConfig.builder() + .build())); + assertThat(configuredApp.getKafkaProperties(RuntimeConfiguration.create("fake"))) + .doesNotContainKey(PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG); + } + @RequiredArgsConstructor private static class TestApplication implements StreamsApp {