diff --git a/.github/workflows/build-and-publish.yaml b/.github/workflows/build-and-publish.yaml index d8e2635a7..afd0febba 100644 --- a/.github/workflows/build-and-publish.yaml +++ b/.github/workflows/build-and-publish.yaml @@ -12,6 +12,8 @@ jobs: build-and-publish: name: Java Gradle uses: bakdata/ci-templates/.github/workflows/java-gradle-library.yaml@1.81.2 + with: + gradle-refresh-dependencies: true secrets: sonar-token: ${{ secrets.SONARCLOUD_TOKEN }} sonar-organization: ${{ secrets.SONARCLOUD_ORGANIZATION }} diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 71afcc008..64ae254a8 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -15,9 +15,9 @@ kafka-schema-serializer = { group = "io.confluent", name = "kafka-schema-seriali kafka-schema-registry-client = { group = "io.confluent", name = "kafka-schema-registry-client" } kafka-streams-avro-serde = { group = "io.confluent", name = "kafka-streams-avro-serde" } kafka-protobuf-provider = { group = "io.confluent", name = "kafka-protobuf-provider" } -largeMessage-bom = { group = "com.bakdata.kafka", name = "large-message-bom", version = "3.1.1" } +largeMessage-bom = { group = "com.bakdata.kafka", name = "large-message-bom", version = "3.1.2-SNAPSHOT" } largeMessage-core = { group = "com.bakdata.kafka", name = "large-message-core" } -errorHandling-bom = { group = "com.bakdata.kafka", name = "error-handling-bom", version = "2.1.2" } +errorHandling-bom = { group = "com.bakdata.kafka", name = "error-handling-bom", version = "2.2.0" } errorHandling-core = { group = "com.bakdata.kafka", name = "error-handling-core" } picocli = { group = "info.picocli", name = "picocli", version = "4.7.7" } slf4j = { group = "org.slf4j", name = "slf4j-api", version = "2.0.17" } diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/ConfiguredStreamsApp.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/ConfiguredStreamsApp.java index 5fb01ddb2..39e1a8bc3 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/ConfiguredStreamsApp.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/ConfiguredStreamsApp.java @@ -26,6 +26,7 @@ import com.bakdata.kafka.ConfiguredApp; import com.bakdata.kafka.EnvironmentKafkaConfigParser; +import com.bakdata.kafka.FilteringProcessingExceptionHandler; import com.bakdata.kafka.KafkaPropertiesFactory; import com.bakdata.kafka.RuntimeConfiguration; import com.bakdata.kafka.streams.kstream.StreamsBuilderX; @@ -42,6 +43,7 @@ /** * A {@link StreamsApp} with a corresponding {@link StreamsAppConfiguration} + * * @param type of {@link StreamsApp} */ @RequiredArgsConstructor @@ -50,24 +52,6 @@ public class ConfiguredStreamsApp implements ConfiguredApp private final @NonNull T app; private final @NonNull StreamsAppConfiguration configuration; - private static Map createBaseConfig() { - final Map kafkaConfig = new HashMap<>(); - - // exactly once and order - kafkaConfig.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2); - kafkaConfig.put(StreamsConfig.producerPrefix(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION), 1); - - kafkaConfig.put(StreamsConfig.producerPrefix(ProducerConfig.ACKS_CONFIG), "all"); - - // compression - kafkaConfig.put(StreamsConfig.producerPrefix(ProducerConfig.COMPRESSION_TYPE_CONFIG), - CompressionType.GZIP.toString()); - - kafkaConfig.put(StreamsConfig.ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG, true); - - return kafkaConfig; - } - /** *

This method creates the configuration to run a {@link StreamsApp}.

* Configuration is created in the following order @@ -82,6 +66,13 @@ private static Map createBaseConfig() { * * *
  • + * Dead letter queue is configured if an error topic is set: + *
    +     * errors.dead.letter.queue.topic.name={@link StreamsTopicConfig#getErrorTopic()}
    +     * processing.exception.handler={@link FilteringProcessingExceptionHandler}
    +     * 
    + *
  • + *
  • * Configs provided by {@link StreamsApp#createKafkaProperties()} *
  • *
  • @@ -114,10 +105,11 @@ public Map getKafkaProperties(final RuntimeConfiguration runtime /** * Get unique application identifier of {@link StreamsApp} + * * @return unique application identifier - * @see StreamsApp#getUniqueAppId(StreamsAppConfiguration) * @throws IllegalArgumentException if unique application identifier of {@link StreamsApp} is different from * provided application identifier in {@link StreamsAppConfiguration} + * @see StreamsApp#getUniqueAppId(StreamsAppConfiguration) */ public String getUniqueAppId() { final String uniqueAppId = @@ -128,17 +120,9 @@ public String getUniqueAppId() { return uniqueAppId; } - /** - * Get topic configuration - * - * @return topic configuration - */ - public StreamsTopicConfig getTopics() { - return this.configuration.getTopics(); - } - /** * Create an {@link ExecutableStreamsApp} using the provided {@link RuntimeConfiguration} + * * @return {@link ExecutableStreamsApp} */ @Override @@ -153,6 +137,15 @@ public ExecutableStreamsApp withRuntimeConfiguration(final RuntimeConfigurati .build(); } + /** + * Get topic configuration + * + * @return topic configuration + */ + public StreamsTopicConfig getTopics() { + return this.configuration.getTopics(); + } + /** * Create the topology of the Kafka Streams app * @@ -170,8 +163,34 @@ public void close() { this.app.close(); } + private Map createBaseConfig() { + final Map kafkaConfig = new HashMap<>(); + + // exactly once and order + kafkaConfig.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2); + kafkaConfig.put(StreamsConfig.producerPrefix(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION), 1); + + kafkaConfig.put(StreamsConfig.producerPrefix(ProducerConfig.ACKS_CONFIG), "all"); + + // compression + kafkaConfig.put(StreamsConfig.producerPrefix(ProducerConfig.COMPRESSION_TYPE_CONFIG), + CompressionType.GZIP.toString()); + + kafkaConfig.put(StreamsConfig.ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG, true); + + // built-in dead-letter queue + final String errorTopic = this.getTopics().getErrorTopic(); + if (errorTopic != null) { + kafkaConfig.put(StreamsConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG, errorTopic); + kafkaConfig.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, + FilteringProcessingExceptionHandler.class); + } + + return kafkaConfig; + } + private KafkaPropertiesFactory createPropertiesFactory(final RuntimeConfiguration runtimeConfig) { - final Map baseConfig = createBaseConfig(); + final Map baseConfig = this.createBaseConfig(); return KafkaPropertiesFactory.builder() .baseConfig(baseConfig) .app(this.app) diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/streams/kstream/ConfiguredStreamsAppTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/streams/kstream/ConfiguredStreamsAppTest.java index 18ae0159e..fa9bd5668 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/streams/kstream/ConfiguredStreamsAppTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/streams/kstream/ConfiguredStreamsAppTest.java @@ -27,9 +27,11 @@ import static java.util.Collections.emptyMap; import static org.apache.kafka.streams.StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG; import static org.apache.kafka.streams.StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG; +import static org.apache.kafka.streams.StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import com.bakdata.kafka.FilteringProcessingExceptionHandler; import com.bakdata.kafka.RuntimeConfiguration; import com.bakdata.kafka.streams.ConfiguredStreamsApp; import com.bakdata.kafka.streams.SerdeConfig; @@ -243,6 +245,27 @@ public void buildTopology(final StreamsBuilderX builder) { .hasMessageStartingWith("Invalid topology:"); } + @Test + void shouldSetProcessingExceptionHandler() { + final ConfiguredStreamsApp configuredApp = + new ConfiguredStreamsApp<>(new TestApplication(), + new StreamsAppConfiguration(StreamsTopicConfig.builder() + .errorTopic("error") + .build())); + assertThat(configuredApp.getKafkaProperties(RuntimeConfiguration.create("fake"))) + .containsEntry(PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, FilteringProcessingExceptionHandler.class); + } + + @Test + void shouldNotSetProcessingExceptionHandler() { + final ConfiguredStreamsApp configuredApp = + new ConfiguredStreamsApp<>(new TestApplication(), + new StreamsAppConfiguration(StreamsTopicConfig.builder() + .build())); + assertThat(configuredApp.getKafkaProperties(RuntimeConfiguration.create("fake"))) + .doesNotContainKey(PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG); + } + @RequiredArgsConstructor private static class TestApplication implements StreamsApp {