Skip to content

Instantly share code, notes, and snippets.

@StephenFlavin
Last active October 10, 2024 10:12
Show Gist options
  • Save StephenFlavin/f4c7dc7758a09e84fb7185ac2d44bcf8 to your computer and use it in GitHub Desktop.
Save StephenFlavin/f4c7dc7758a09e84fb7185ac2d44bcf8 to your computer and use it in GitHub Desktop.
spring.kafka.topics autoconfiguration
spring:
application.name: spring-kafka-auto-multi-topic
kafka:
listener:
ack-mode: record
concurrency: 1
topics:
create-topics: true
retries-config:
enabled: true
use-shared-topic: true
shared-topic-name: ${spring.application.name}
retry-topic-suffix: -retry
dlt-suffix: -dlt
dlt-strategy: always_retry_on_error
partitions: 1
partitions: 1
replicas: 1
by-topic-config:
sensor-events:
name: reactor-sensor-events
enabled: true
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
properties:
batch.size: 65536
linger.ms: 3
compression.type: lz4
consumer:
auto-offset-reset: earliest
client-id: ${spring.application.name}
group-id: ${spring.application.name}
bootstrap-servers: localhost:9092
package com.example.kafka.helper;
import com.example.kafka.domain.TopicConfig;
import com.example.kafka.domain.TopicsConfig;
import com.example.kafka.serialization.DltAwareDelegatingByTopicDeserializer;
import com.example.kafka.serialization.DltAwareDelegatingByTopicSerializer;
import java.util.Map;
import java.util.StringJoiner;
import java.util.function.Function;
import java.util.regex.Pattern;
import static org.springframework.kafka.support.serializer.DelegatingByTopicSerialization.KEY_SERIALIZATION_TOPIC_CONFIG;
import static org.springframework.kafka.support.serializer.DelegatingByTopicSerialization.VALUE_SERIALIZATION_TOPIC_CONFIG;
public class ByTopicConfigHelper {
public static Map<String, Object> toByTopicSerializerConfig(final TopicsConfig topicsConfig) {
return toByTopicSerializationConfig(topicsConfig,
TopicConfig::keySerializer,
TopicConfig::valueSerializer,
DltAwareDelegatingByTopicSerializer.class);
}
public static Map<String, Object> toByTopicDeserializerConfig(final TopicsConfig topicsConfig) {
return toByTopicSerializationConfig(topicsConfig,
TopicConfig::keyDeserializer,
TopicConfig::valueDeserializer,
DltAwareDelegatingByTopicDeserializer.class);
}
public static String removeConfiguredFor(final Class<?> clazz,
final String byTopicPattern) {
final var className = Pattern.quote(clazz.getName());
final var regex = "(,)?[^,]*:" + className + "(,)?";
final var pattern = Pattern.compile(regex);
final var matcher = pattern.matcher(byTopicPattern);
final var updatedConfig = new StringBuilder();
while (matcher.find()) {
if (matcher.group(1) != null && matcher.group(2) != null) {
matcher.appendReplacement(updatedConfig, ",");
} else {
matcher.appendReplacement(updatedConfig, "");
}
}
matcher.appendTail(updatedConfig);
return updatedConfig.toString();
}
private static String toDltTopicPattern(final String sharedTopicName,
final String retryTopicSuffix,
final String dltSuffix,
final String serializationClassName) {
return toByTopicPattern("%s(%s(-\\d+)?|%s)?".formatted(sharedTopicName, retryTopicSuffix, dltSuffix), serializationClassName);
}
private static String toSharedDltTopicPattern(final String sharedTopicName,
final String retryTopicSuffix,
final String dltSuffix,
final Class<?> serializationClass) {
return toByTopicPattern("%s(%s(-\\d+)?|%s)".formatted(sharedTopicName, retryTopicSuffix, dltSuffix), serializationClass);
}
private static String toByTopicPattern(final String topic,
final String serializationClass) {
return topic + ":" + serializationClass;
}
private static String toByTopicPattern(final String topic,
final Class<?> serializationClass) {
return toByTopicPattern(topic, serializationClass.getName());
}
private static Map<String, Object> toByTopicSerializationConfig(final TopicsConfig topicsConfig,
final Function<TopicConfig, String> keySerializationProvider,
final Function<TopicConfig, String> valueSerializationProvider,
final Class<?> dltSerializationClass) {
final var retriesConfig = topicsConfig.getRetriesConfig();
final var keySerializationConfigBuilder = new StringJoiner(",");
final var valueSerializationConfigBuilder = new StringJoiner(",");
for (final var topicConfig : topicsConfig.getByTopicConfig().values()) {
if (!topicConfig.enabled()) {
continue;
}
final var keySerializer = keySerializationProvider.apply(topicConfig);
if (keySerializer != null) {
if (retriesConfig.enabled() && !retriesConfig.useSharedTopic()) {
final var keyPattern = toDltTopicPattern(topicConfig.name(),
retriesConfig.retryTopicSuffix(),
retriesConfig.dltSuffix(),
keySerializer);
keySerializationConfigBuilder.add(keyPattern);
} else {
keySerializationConfigBuilder.add(toByTopicPattern(topicConfig.name(), keySerializer));
}
}
final var valueSerializer = valueSerializationProvider.apply(topicConfig);
if (valueSerializer != null) {
if (retriesConfig.enabled() && !retriesConfig.useSharedTopic()) {
final var valuePattern = toDltTopicPattern(topicConfig.name(),
retriesConfig.retryTopicSuffix(),
retriesConfig.dltSuffix(),
valueSerializer);
valueSerializationConfigBuilder.add(valuePattern);
} else {
valueSerializationConfigBuilder.add(toByTopicPattern(topicConfig.name(), valueSerializationProvider.apply(topicConfig)));
}
}
}
if (retriesConfig.enabled() && retriesConfig.useSharedTopic()) {
final var pattern = toSharedDltTopicPattern(retriesConfig.sharedTopicName(),
retriesConfig.retryTopicSuffix(),
retriesConfig.dltSuffix(),
dltSerializationClass);
keySerializationConfigBuilder.add(pattern);
valueSerializationConfigBuilder.add(pattern);
}
return Map.of(KEY_SERIALIZATION_TOPIC_CONFIG, keySerializationConfigBuilder.toString(),
VALUE_SERIALIZATION_TOPIC_CONFIG, valueSerializationConfigBuilder.toString());
}
}
package com.example.kafka.serialization;
import com.example.kafka.helper.ByTopicConfigHelper;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.serializer.DelegatingByTopicDeserializer;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class DltAwareDelegatingByTopicDeserializer extends DelegatingByTopicDeserializer {
public DltAwareDelegatingByTopicDeserializer() {
super();
}
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
final var configKey = isKey ? "spring.kafka.key.serialization.bytopic.config" : "spring.kafka.value.serialization.bytopic.config";
final var byTopicConfig = (String) configs.get(configKey);
final var newByTopicConfig = ByTopicConfigHelper.removeConfiguredFor(getClass(), byTopicConfig);
final var newConfigs = Stream.concat(Stream.of(Map.entry(configKey, newByTopicConfig)), configs.entrySet().stream())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (a, b) -> a));
super.configure(newConfigs, isKey);
}
@Override
public Object deserialize(String topic, Headers headers, byte[] data) {
return deserialize(topic, headers, ByteBuffer.wrap(data));
}
@Override
public Object deserialize(String topic, Headers headers, ByteBuffer data) {
String originalTopic = topic;
if (headers != null) {
Header originalTopicHeader = headers.lastHeader(KafkaHeaders.ORIGINAL_TOPIC);
if (originalTopicHeader != null) {
originalTopic = new String(originalTopicHeader.value(), StandardCharsets.UTF_8);
}
}
return super.deserialize(originalTopic, headers, data);
}
}
package com.example.kafka.serialization;
import com.example.kafka.helper.ByTopicConfigHelper;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.serializer.DelegatingByTopicSerializer;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class DltAwareDelegatingByTopicSerializer extends DelegatingByTopicSerializer {
public DltAwareDelegatingByTopicSerializer() {
super();
}
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
final var configKey = isKey ? "spring.kafka.key.serialization.bytopic.config" : "spring.kafka.value.serialization.bytopic.config";
final var byTopicConfig = (String) configs.get(configKey);
final var newByTopicConfig = ByTopicConfigHelper.removeConfiguredFor(getClass(), byTopicConfig);
final var newConfigs = Stream.concat(Stream.of(Map.entry(configKey, newByTopicConfig)), configs.entrySet().stream())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (a, b) -> a));
super.configure(newConfigs, isKey);
}
@Override
public byte[] serialize(final String topic,
final Headers headers,
final Object data) {
if (data instanceof byte[] b) {
return b;
}
String originalTopic = topic;
if (headers != null) {
Header originalTopicHeader = Optional.ofNullable(headers.lastHeader(KafkaHeaders.ORIGINAL_TOPIC))
.orElseGet(() -> headers.lastHeader(KafkaHeaders.RECEIVED_TOPIC));
if (originalTopicHeader != null) {
originalTopic = new String(originalTopicHeader.value(), StandardCharsets.UTF_8);
}
}
return super.serialize(originalTopic, headers, data);
}
}
package com.example.kafka.config;
import com.example.kafka.domain.TopicConfig;
import com.example.kafka.domain.TopicsConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.autoconfigure.AutoConfigureBefore;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.autoconfigure.kafka.DefaultKafkaConsumerFactoryCustomizer;
import org.springframework.boot.autoconfigure.kafka.DefaultKafkaProducerFactoryCustomizer;
import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.retrytopic.DltStrategy;
import org.springframework.kafka.retrytopic.RetryTopicComponentFactory;
import org.springframework.kafka.retrytopic.RetryTopicConfiguration;
import org.springframework.kafka.retrytopic.RetryTopicConfigurationBuilder;
import org.springframework.kafka.support.serializer.DelegatingByTopicDeserializer;
import org.springframework.kafka.support.serializer.DelegatingByTopicSerializer;
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
import java.util.Collection;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static com.example.kafka.helper.ByTopicConfigHelper.toByTopicDeserializerConfig;
import static com.example.kafka.helper.ByTopicConfigHelper.toByTopicSerializerConfig;
@AutoConfiguration
@EnableConfigurationProperties(TopicsConfig.class)
@AutoConfigureBefore(KafkaAutoConfiguration.class)
public class KafkaTopicAutoConfiguration {
private final TopicsConfig topicsConfig;
KafkaTopicAutoConfiguration(final TopicsConfig topicsConfig) {
this.topicsConfig = topicsConfig;
}
@Bean
public String[] topics() {
return topicsConfig.getByTopicConfig().values().stream()
.filter(TopicConfig::enabled)
.map(TopicConfig::name)
.toArray(String[]::new);
}
@Bean
public String[] realtimeTopics() {
return topicsConfig.getByTopicConfig().entrySet().stream()
.filter(entry -> !entry.getValue().isCompact() && entry.getValue().enabled())
.map(Map.Entry::getKey)
.toArray(String[]::new);
}
@Bean
public String[] compactTopics() {
return topicsConfig.getByTopicConfig().entrySet().stream()
.filter(entry -> entry.getValue().isCompact() && entry.getValue().enabled())
.map(Map.Entry::getKey)
.toArray(String[]::new);
}
@Bean
@ConditionalOnProperty(value = "spring.kafka.topics.create-topics", havingValue = "true")
public KafkaAdmin.NewTopics newTopics() {
final var newTopics = topicsConfig.getByTopicConfig().values().stream()
.filter(TopicConfig::enabled)
.map(topicConfig -> {
final var defaultTopicProperties = topicsConfig.getTopicProperties();
final var topicBuilder = TopicBuilder.name(topicConfig.name())
.partitions(Optional.ofNullable(topicConfig.partitions())
.orElse(topicsConfig.getPartitions()))
.replicas(Optional.ofNullable(topicConfig.replicas())
.orElse(topicsConfig.getReplicas()))
.configs(Optional.ofNullable(topicConfig.topicProperties())
.map(topicProperties -> Stream.of(topicProperties, defaultTopicProperties)
.filter(Objects::nonNull)
.map(Map::entrySet)
.flatMap(Collection::stream)
.collect(Collectors.toUnmodifiableMap(Map.Entry::getKey,
Map.Entry::getValue,
(topicProperty, defaultTopicProperty) -> topicProperty)))
.orElse(defaultTopicProperties == null ? Map.of() : defaultTopicProperties));
if (topicConfig.isCompact()) {
topicBuilder.compact();
}
return topicBuilder.build();
})
.toArray(NewTopic[]::new);
return new KafkaAdmin.NewTopics(newTopics);
}
// converts the by topic config into valid configuration for the spring DelegatingByTopicSerialization
@Bean
public DefaultKafkaConsumerFactoryCustomizer kafkaConsumerFactoryCustomizer() {
final var byTopicDeserializerConfig = toByTopicDeserializerConfig(topicsConfig);
final var retriesConfig = topicsConfig.getRetriesConfig();
if (retriesConfig != null && retriesConfig.dltStrategy() != DltStrategy.NO_DLT) {
final var errorHandlingDeserializerConfig = Map.<String, Object>of(
"key.deserializer", ErrorHandlingDeserializer.class.getName(),
"value.deserializer", ErrorHandlingDeserializer.class.getName(),
ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, DelegatingByTopicDeserializer.class.getName(),
ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, DelegatingByTopicDeserializer.class.getName());
return consumerFactory -> {
consumerFactory.updateConfigs(errorHandlingDeserializerConfig);
consumerFactory.updateConfigs(byTopicDeserializerConfig);
};
}
final var deserializerConfig = Map.<String, Object>of("key.deserializer", DelegatingByTopicDeserializer.class.getName(),
"value.deserializer", DelegatingByTopicDeserializer.class.getName());
return consumerFactory -> {
consumerFactory.updateConfigs(deserializerConfig);
consumerFactory.updateConfigs(byTopicDeserializerConfig);
};
}
// converts the by topic config into valid configuration for the spring DelegatingByTopicSerialization
@Bean
@Order(Ordered.HIGHEST_PRECEDENCE)
public DefaultKafkaProducerFactoryCustomizer kafkaProducerFactoryCustomizer() {
final var byTopicSerializerConfig = toByTopicSerializerConfig(topicsConfig);
final var serializerConfig = Map.<String, Object>of(
"key.serializer", DelegatingByTopicSerializer.class.getName(),
"value.serializer", DelegatingByTopicSerializer.class.getName());
return producerFactory -> {
producerFactory.updateConfigs(serializerConfig);
producerFactory.updateConfigs(byTopicSerializerConfig);
};
}
@Bean
@ConditionalOnProperty(value = "spring.kafka.topics.retries-config.enabled", havingValue = "true")
public RetryTopicConfiguration retryTopicConfiguration(final KafkaTemplate<Object, Object> kafkaTemplate) {
final var retryableTopics = topicsConfig.getByTopicConfig().values().stream()
.filter(topicConfig -> topicConfig.keySerializer() != null && topicConfig.valueSerializer() != null)
.toList();
return RetryTopicConfigurationBuilder.newInstance()
.includeTopics(retryableTopics.stream().map(TopicConfig::name).toList())
.dltProcessingFailureStrategy(topicsConfig.getRetriesConfig().dltStrategy())
.autoCreateTopics(topicsConfig.createTopics(), topicsConfig.getRetriesConfig().partitions(), (short) 3)
.create(kafkaTemplate);
}
@Bean
@ConditionalOnProperty(value = "spring.kafka.topics.retries-config.use-shared-topic", havingValue = "true")
public RetryTopicComponentFactory retryTopicComponentFactory() {
return new SharedRetryTopicComponentFactory(topicsConfig.getRetriesConfig().sharedTopicName());
}
}
package com.example.kafka.domain;
import org.springframework.kafka.retrytopic.DltStrategy;
public record RetriesConfig(boolean enabled,
boolean useSharedTopic,
String sharedTopicName,
String retryTopicSuffix,
String dltSuffix,
DltStrategy dltStrategy,
Integer partitions) {
public String retryTopicSuffix() {
if (retryTopicSuffix == null) {
return "-retry";
}
return retryTopicSuffix;
}
public String dltSuffix() {
if (dltSuffix == null) {
return "-dlt";
}
return dltSuffix;
}
}
package com.example.kafka.config;
import org.springframework.kafka.retrytopic.RetryTopicComponentFactory;
import org.springframework.kafka.retrytopic.RetryTopicNamesProviderFactory;
public class SharedRetryTopicComponentFactory extends RetryTopicComponentFactory {
private final String sharedDltTopic;
public SharedRetryTopicComponentFactory(String sharedDltTopic) {
this.sharedDltTopic = sharedDltTopic;
}
@Override
public RetryTopicNamesProviderFactory retryTopicNamesProviderFactory() {
return new SharedRetryTopicNamesProviderFactory(sharedDltTopic);
}
}
package com.example.kafka.config;
import org.springframework.kafka.retrytopic.DestinationTopic;
import org.springframework.kafka.retrytopic.RetryTopicNamesProviderFactory;
import org.springframework.kafka.retrytopic.SuffixingRetryTopicNamesProviderFactory;
public class SharedRetryTopicNamesProviderFactory implements RetryTopicNamesProviderFactory {
private final String sharedDltTopic;
public SharedRetryTopicNamesProviderFactory(String sharedDltTopic) {
this.sharedDltTopic = sharedDltTopic;
}
@Override
public RetryTopicNamesProvider createRetryTopicNamesProvider(DestinationTopic.Properties properties) {
if (properties.isMainEndpoint()) {
return new SuffixingRetryTopicNamesProviderFactory.SuffixingRetryTopicNamesProvider(properties);
} else {
return new SuffixingRetryTopicNamesProviderFactory.SuffixingRetryTopicNamesProvider(properties) {
@Override
public String getTopicName(String topic) {
return super.getTopicName(sharedDltTopic);
}
};
}
}
}
package com.example.kafka.domain;
import java.util.Map;
public record TopicConfig(String name,
boolean enabled,
String keySerializer,
String valueSerializer,
String keyDeserializer,
String valueDeserializer,
Integer partitions,
Integer replicas,
boolean isCompact,
Map<String, String> topicProperties) {
}
package com.example.kafka.domain;
import org.springframework.boot.context.properties.ConfigurationProperties;
import java.util.Map;
import java.util.Objects;
@ConfigurationProperties(prefix = "spring.kafka.topics")
public class TopicsConfig {
private boolean createTopics;
private Map<String, TopicConfig> byTopicConfig;
private Integer partitions = 1;
private Integer replicas = 1;
private boolean isCompact;
private RetriesConfig retriesConfig;
private Map<String, String> topicProperties;
public TopicsConfig() {
}
public boolean createTopics() {
return createTopics;
}
public void setCreateTopics(boolean createTopics) {
this.createTopics = createTopics;
}
public Map<String, TopicConfig> getByTopicConfig() {
return byTopicConfig;
}
public void setByTopicConfig(Map<String, TopicConfig> byTopicConfig) {
this.byTopicConfig = byTopicConfig;
}
public Integer getPartitions() {
return partitions;
}
public void setPartitions(Integer partitions) {
this.partitions = partitions;
}
public Integer getReplicas() {
return replicas;
}
public void setReplicas(Integer replicas) {
this.replicas = replicas;
}
public boolean isCompact() {
return isCompact;
}
public void setCompact(boolean compact) {
isCompact = compact;
}
public RetriesConfig getRetriesConfig() {
return retriesConfig;
}
public void setRetriesConfig(RetriesConfig retriesConfig) {
this.retriesConfig = retriesConfig;
}
public Map<String, String> getTopicProperties() {
return topicProperties;
}
public void setTopicProperties(Map<String, String> topicProperties) {
this.topicProperties = topicProperties;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof TopicsConfig that)) return false;
return createTopics == that.createTopics && isCompact == that.isCompact && Objects.equals(byTopicConfig, that.byTopicConfig) && Objects.equals(partitions, that.partitions) && Objects.equals(replicas, that.replicas) && Objects.equals(retriesConfig, that.retriesConfig) && Objects.equals(topicProperties, that.topicProperties);
}
@Override
public int hashCode() {
return Objects.hash(createTopics, byTopicConfig, partitions, replicas, isCompact, retriesConfig, topicProperties);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment