- three nodes cluser version 4.1.0
- load balancer
- I am using this one
- Three Java, Go, .Net
- 1 producer 1 consumer
- .NET client
- Go Client
- Java Client, following code:
Address entryPoint = new Address("localhost", 5553);
Environment environment = Environment.builder().host("localhost").port(5553).addressResolver(address -> entryPoint).build();
AtomicLong consumed = new AtomicLong();
AtomicLong published = new AtomicLong();
AtomicLong publishedNotConfirmed = new AtomicLong();
Thread consumerThread = new Thread(() -> {
try {
while (true) {
Thread.sleep(2000);
System.out.println(Instant.now() + " [Java] Received: " + customFormat("###,###.###", consumed.get()) + " messages");
System.out.println(Instant.now() + " [Java] Published: " + customFormat("###,###.###", published.get()) + " messages");
System.out.println(Instant.now() + " [Java] Published Not Confirmed: " + customFormat("###,###.###", publishedNotConfirmed.get()) + " messages");
System.out.println("Total Published: " + customFormat("###,###.###", published.get() + publishedNotConfirmed.get()) + " messages");
}
} catch (Exception e) {
e.printStackTrace();
}
});
consumerThread.start();
environment.consumerBuilder().stream("reliable-Test-0").offset(OffsetSpecification.first()).messageHandler((context, message) -> {
consumed.incrementAndGet();
}).build();
Producer producer = environment.producerBuilder().stream("reliable-Test-0").build();
System.out.println("Starting publishing...");
int messageCount = 50_000_000;
for (int i = 0; i < messageCount; i++) {
try {
producer.send(producer.messageBuilder().addData(String.valueOf(i).getBytes()).build(), confirmationStatus -> {
if (confirmationStatus.isConfirmed()) {
published.incrementAndGet();
} else {
publishedNotConfirmed.incrementAndGet();
}
});
} catch (Exception e) {
publishedNotConfirmed.incrementAndGet();
}
}
try {
Thread.sleep(15_000);
} catch (InterruptedException e) {
e.printStackTrace();
}
producer.close();
try {
System.in.read();
} catch (Exception e) {
e.printStackTrace();
}
}
}
- The producers have to send:
- Java: 50_000_000
- Net: 50_000_000
- golang 100_000_000
during the tests this script kills all the docker images in a sequential way:
declare -a arr=$(docker ps -q)
for i in {1..500}
do
for image in $(docker ps -q); do
echo Stopping ..$image
docker restart $image
echo Waiting 20s ..$image
sleep 20
echo Waiting 30s ..$image
sleep 30
done
done
Messages in RabbitMQ:

- Faild + Confirmed = 100,000,000 ✅
- Consumed = 175,563,479 ✅

- Faild + Confirmed = 50,000,000 ✅
- Consumed = 175,563,479 ✅

- Faild + Confirmed = 50,000,000 ✅
- Consumed = 175,563,498 ✅ ( by default java re-consume the last message so there could be some duplication. But it is ok) for more complex subscriptions https://rabbitmq.github.io/rabbitmq-stream-java-client/stable/htmlsingle/#consumer-subscription-listener