-
-
Save javatrending/b106a3d506f0f5b3b29473068ba5ed32 to your computer and use it in GitHub Desktop.
Example of configuring Kafka Streams within a Spring Boot application with an example of SSL configuration
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# With SSL configured | |
spring: | |
application: | |
name: my-stream-app | |
kafka: | |
bootstrap-servers: | |
- server1:9092 | |
- server2:9092 | |
ssl: | |
truststore-location: file:ca-truststore-client.jks | |
truststore-password: | |
keystore-location: file:client-keystore.p12 | |
keystore-password: | |
key-password: | |
properties: | |
security.protocol: SSL |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.fasterxml.jackson.databind.JsonNode; | |
import com.fasterxml.jackson.databind.node.ObjectNode; | |
import org.apache.kafka.common.serialization.Serdes; | |
import org.apache.kafka.streams.KafkaStreams; | |
import org.apache.kafka.streams.StreamsBuilder; | |
import org.apache.kafka.streams.StreamsConfig; | |
import org.apache.kafka.streams.Topology; | |
import org.apache.kafka.streams.kstream.GlobalKTable; | |
import org.apache.kafka.streams.kstream.KStream; | |
import org.apache.kafka.streams.kstream.Materialized; | |
import org.springframework.beans.factory.annotation.Autowired; | |
import org.springframework.beans.factory.annotation.Value; | |
import org.springframework.boot.autoconfigure.kafka.KafkaProperties; | |
import org.springframework.context.annotation.Bean; | |
import org.springframework.context.annotation.Configuration; | |
import org.springframework.kafka.support.serializer.JsonDeserializer; | |
import org.springframework.kafka.support.serializer.JsonSerde; | |
import java.util.Properties; | |
@Configuration | |
public class KafkaStreamsConfig { | |
@Bean | |
public KafkaStreams kafkaStreams(KafkaProperties kafkaProperties, | |
@Value("${spring.application.name}") String appName) { | |
final Properties props = new Properties(); | |
// inject SSL related properties | |
props.putAll(kafkaProperties.getSsl().buildProperties()); | |
props.putAll(kafkaProperties.getProperties()); | |
// stream config centric ones | |
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers()); | |
props.put(StreamsConfig.APPLICATION_ID_CONFIG, appName); | |
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); | |
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class); | |
props.put(StreamsConfig.STATE_DIR_CONFIG, "data"); | |
// others | |
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, JsonNode.class); | |
final KafkaStreams kafkaStreams = new KafkaStreams(kafkaStreamTopology(), props); | |
kafkaStreams.start(); | |
return kafkaStreams; | |
} | |
@Bean | |
public Topology kafkaStreamTopology() { | |
final StreamsBuilder streamsBuilder = new StreamsBuilder(); | |
// streamsBuilder.stream("some_topic") etc ... | |
return streamsBuilder.build(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment