Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/build-and-publish.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ jobs:
build-and-publish:
name: Java Gradle
uses: bakdata/ci-templates/.github/workflows/java-gradle-library.yaml@1.81.2
with:
gradle-refresh-dependencies: true
secrets:
sonar-token: ${{ secrets.SONARCLOUD_TOKEN }}
sonar-organization: ${{ secrets.SONARCLOUD_ORGANIZATION }}
Expand Down
4 changes: 2 additions & 2 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ kafka-schema-serializer = { group = "io.confluent", name = "kafka-schema-seriali
kafka-schema-registry-client = { group = "io.confluent", name = "kafka-schema-registry-client" }
kafka-streams-avro-serde = { group = "io.confluent", name = "kafka-streams-avro-serde" }
kafka-protobuf-provider = { group = "io.confluent", name = "kafka-protobuf-provider" }
largeMessage-bom = { group = "com.bakdata.kafka", name = "large-message-bom", version = "3.1.1" }
largeMessage-bom = { group = "com.bakdata.kafka", name = "large-message-bom", version = "3.1.2-SNAPSHOT" }
largeMessage-core = { group = "com.bakdata.kafka", name = "large-message-core" }
errorHandling-bom = { group = "com.bakdata.kafka", name = "error-handling-bom", version = "2.1.2" }
errorHandling-bom = { group = "com.bakdata.kafka", name = "error-handling-bom", version = "2.2.0" }
errorHandling-core = { group = "com.bakdata.kafka", name = "error-handling-core" }
picocli = { group = "info.picocli", name = "picocli", version = "4.7.7" }
slf4j = { group = "org.slf4j", name = "slf4j-api", version = "2.0.17" }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import com.bakdata.kafka.ConfiguredApp;
import com.bakdata.kafka.EnvironmentKafkaConfigParser;
import com.bakdata.kafka.FilteringProcessingExceptionHandler;
import com.bakdata.kafka.KafkaPropertiesFactory;
import com.bakdata.kafka.RuntimeConfiguration;
import com.bakdata.kafka.streams.kstream.StreamsBuilderX;
Expand All @@ -42,6 +43,7 @@

/**
* A {@link StreamsApp} with a corresponding {@link StreamsAppConfiguration}
*
* @param <T> type of {@link StreamsApp}
*/
@RequiredArgsConstructor
Expand All @@ -50,24 +52,6 @@ public class ConfiguredStreamsApp<T extends StreamsApp> implements ConfiguredApp
private final @NonNull T app;
private final @NonNull StreamsAppConfiguration configuration;

private static Map<String, Object> createBaseConfig() {
final Map<String, Object> kafkaConfig = new HashMap<>();

// exactly once and order
kafkaConfig.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
kafkaConfig.put(StreamsConfig.producerPrefix(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION), 1);

kafkaConfig.put(StreamsConfig.producerPrefix(ProducerConfig.ACKS_CONFIG), "all");

// compression
kafkaConfig.put(StreamsConfig.producerPrefix(ProducerConfig.COMPRESSION_TYPE_CONFIG),
CompressionType.GZIP.toString());

kafkaConfig.put(StreamsConfig.ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG, true);

return kafkaConfig;
}

/**
* <p>This method creates the configuration to run a {@link StreamsApp}.</p>
* Configuration is created in the following order
Expand All @@ -82,6 +66,13 @@ private static Map<String, Object> createBaseConfig() {
* </pre>
* </li>
* <li>
* Dead letter queue is configured if an error topic is set:
* <pre>
* errors.dead.letter.queue.topic.name={@link StreamsTopicConfig#getErrorTopic()}
* processing.exception.handler={@link FilteringProcessingExceptionHandler}
* </pre>
* </li>
* <li>
* Configs provided by {@link StreamsApp#createKafkaProperties()}
* </li>
* <li>
Expand Down Expand Up @@ -114,10 +105,11 @@ public Map<String, Object> getKafkaProperties(final RuntimeConfiguration runtime

/**
* Get unique application identifier of {@link StreamsApp}
*
* @return unique application identifier
* @see StreamsApp#getUniqueAppId(StreamsAppConfiguration)
* @throws IllegalArgumentException if unique application identifier of {@link StreamsApp} is different from
* provided application identifier in {@link StreamsAppConfiguration}
* @see StreamsApp#getUniqueAppId(StreamsAppConfiguration)
*/
public String getUniqueAppId() {
final String uniqueAppId =
Expand All @@ -128,17 +120,9 @@ public String getUniqueAppId() {
return uniqueAppId;
}

/**
* Get topic configuration
*
* @return topic configuration
*/
public StreamsTopicConfig getTopics() {
return this.configuration.getTopics();
}

/**
* Create an {@link ExecutableStreamsApp} using the provided {@link RuntimeConfiguration}
*
* @return {@link ExecutableStreamsApp}
*/
@Override
Expand All @@ -153,6 +137,15 @@ public ExecutableStreamsApp<T> withRuntimeConfiguration(final RuntimeConfigurati
.build();
}

/**
* Get topic configuration
*
* @return topic configuration
*/
public StreamsTopicConfig getTopics() {
return this.configuration.getTopics();
}

/**
* Create the topology of the Kafka Streams app
*
Expand All @@ -170,8 +163,34 @@ public void close() {
this.app.close();
}

private Map<String, Object> createBaseConfig() {
final Map<String, Object> kafkaConfig = new HashMap<>();

// exactly once and order
kafkaConfig.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
kafkaConfig.put(StreamsConfig.producerPrefix(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION), 1);

kafkaConfig.put(StreamsConfig.producerPrefix(ProducerConfig.ACKS_CONFIG), "all");

// compression
kafkaConfig.put(StreamsConfig.producerPrefix(ProducerConfig.COMPRESSION_TYPE_CONFIG),
CompressionType.GZIP.toString());

kafkaConfig.put(StreamsConfig.ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG, true);

// built-in dead-letter queue
final String errorTopic = this.getTopics().getErrorTopic();
if (errorTopic != null) {
kafkaConfig.put(StreamsConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG, errorTopic);
kafkaConfig.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG,
FilteringProcessingExceptionHandler.class);
}

return kafkaConfig;
}

private KafkaPropertiesFactory createPropertiesFactory(final RuntimeConfiguration runtimeConfig) {
final Map<String, Object> baseConfig = createBaseConfig();
final Map<String, Object> baseConfig = this.createBaseConfig();
return KafkaPropertiesFactory.builder()
.baseConfig(baseConfig)
.app(this.app)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@
import static java.util.Collections.emptyMap;
import static org.apache.kafka.streams.StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

import com.bakdata.kafka.FilteringProcessingExceptionHandler;
import com.bakdata.kafka.RuntimeConfiguration;
import com.bakdata.kafka.streams.ConfiguredStreamsApp;
import com.bakdata.kafka.streams.SerdeConfig;
Expand Down Expand Up @@ -243,6 +245,27 @@ public void buildTopology(final StreamsBuilderX builder) {
.hasMessageStartingWith("Invalid topology:");
}

@Test
void shouldSetProcessingExceptionHandler() {
final ConfiguredStreamsApp<StreamsApp> configuredApp =
new ConfiguredStreamsApp<>(new TestApplication(),
new StreamsAppConfiguration(StreamsTopicConfig.builder()
.errorTopic("error")
.build()));
assertThat(configuredApp.getKafkaProperties(RuntimeConfiguration.create("fake")))
.containsEntry(PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, FilteringProcessingExceptionHandler.class);
}

@Test
void shouldNotSetProcessingExceptionHandler() {
final ConfiguredStreamsApp<StreamsApp> configuredApp =
new ConfiguredStreamsApp<>(new TestApplication(),
new StreamsAppConfiguration(StreamsTopicConfig.builder()
.build()));
assertThat(configuredApp.getKafkaProperties(RuntimeConfiguration.create("fake")))
.doesNotContainKey(PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG);
}

@RequiredArgsConstructor
private static class TestApplication implements StreamsApp {

Expand Down
Loading