Skip to content

Instantly share code, notes, and snippets.

@davidarchanjo
Created February 3, 2023 05:46
Show Gist options
  • Save davidarchanjo/65fa9f67871566d5c8a34f7411ef73d6 to your computer and use it in GitHub Desktop.
Save davidarchanjo/65fa9f67871566d5c8a34f7411ef73d6 to your computer and use it in GitHub Desktop.
Sample Kafka producer which connects in a cluster through authentication using the reactor-kafka library
package io.davidarchanjo;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.text.MessageFormat;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import reactor.core.publisher.Flux;
import reactor.kafka.sender.KafkaSender;
import reactor.kafka.sender.SenderOptions;
import reactor.kafka.sender.SenderRecord;
public class SampleKafkaAuthProducer {
static String bootstrapServers = "localhost:9092";
static String topic = "sample-topic";
static String username = "admin";
static String password = "admin-secret";
static Logger logger = Logger.getLogger(SampleKafkaAuthProducer.class.getName());
KafkaSender<Integer, String> sender;
public SampleKafkaAuthProducer(String bootstrapServers) {
var props = new Properties();
props.put("bootstrap.servers", bootstrapServers);
props.put("key.serializer", IntegerSerializer.class);
props.put("value.serializer", StringSerializer.class);
props.put("acks", "all");
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism", "PLAIN");
props.put("sasl.jaas.config", MessageFormat.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"{0}\" password=\"{1}\";",
username, password));
var senderOptions = SenderOptions.<Integer, String>create(props);
sender = KafkaSender.create(senderOptions);
}
void publish(String topic, int count, CountDownLatch latch) {
sender.<Integer>send(
Flux.range(1, count).map(i -> SenderRecord.create(new ProducerRecord<>(topic, i, "Message_" + i), i)))
.doOnError(e -> logger.log(Level.SEVERE, "Publish failed", e))
.subscribe(o -> {
var metadata = o.recordMetadata();
var correlationId = o.correlationMetadata();
logger.log(Level.INFO, MessageFormat.format("Publish success, correlationId={0} topic={1} partition={2} offset={3}",
correlationId, metadata.topic(), metadata.partition(), metadata.offset()));
latch.countDown();
});
}
void close() {
sender.close();
}
public static void main(String... args) throws Exception {
var count = 20;
var latch = new CountDownLatch(count);
var producer = new SampleKafkaAuthProducer(bootstrapServers);
producer.publish(topic, count, latch);
latch.await(10, TimeUnit.SECONDS);
producer.close();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment