Currently, the Service Bus Binder does not support to create topic or queue with any topic options. See more from customer-reported issue: Azure/azure-sdk-for-java#35435
Prepare to support the following two high-priority properties:
- Default message timespan to live value.
- Maximum size of the topic or queue in megabytes, which is the size of memory allocated for the queue.
Next, we wil compare RabbitMQ binder and Kafka binder, and output the conclusion.
On RabbitMQ binder side, developer can define the following similar properties to create exchange or queue with options:
spring.cloud.stream.rabbit.bindings.<channelName>.producer.ttl=5m
spring.cloud.stream.rabbit.bindings.<channelName>.producer.max-length-bytes=1024
spring.cloud.stream.rabbit.bindings.<channelName>.consumer.ttl=5m
spring.cloud.stream.rabbit.bindings.<channelName>.consumer.max-length-bytes=1024
On RabbitMQ broker side, they are the time-to-live and length limit configurations:
- The
x-message-ttl
parameter is used to set a TTL for exchange or queue. - The
x-max-length-bytes
parameter is used to limit the size of the exchange or queue in bytes.
See convertion in source code RabbitExchangeQueueProvisioner#provisionProducerDestination:
@Override
public ProducerDestination provisionProducerDestination(String name,
ExtendedProducerProperties<RabbitProducerProperties> producerProperties) {
Exchange exchange = buildExchange(producerProperties.getExtension(),
exchangeName);
if (producerProperties.getExtension().isDeclareExchange()) {
declareExchange(exchangeName, exchange);
}
for (String requiredGroupName : producerProperties.getRequiredGroups()) {
String baseQueueName = producerProperties.getExtension()
.isQueueNameGroupOnly() ? requiredGroupName
: (exchangeName + "." + requiredGroupName);
if (!producerProperties.isPartitioned()) {
if (producerProperties.getExtension().isBindQueue()) {
Queue queue = new Queue(baseQueueName, true, false, false, queueArgs(
baseQueueName, producerProperties.getExtension(), false));
declareQueue(baseQueueName, queue);
...
}
}
else {
for (int i = 0; i < producerProperties.getPartitionCount(); i++) {
if (producerProperties.getExtension().isBindQueue()) {
Queue queue = new Queue(partitionQueueName, true, false, false,
queueArgs(partitionQueueName,
producerProperties.getExtension(), false));
declareQueue(queue.getName(), queue);
...
}
}
}
}
return new RabbitProducerDestination(exchange, binding);
}
private Map<String, Object> queueArgs(String queueName,
RabbitCommonProperties properties, boolean isDlq) {
Map<String, Object> args = new HashMap<>();
...
additionalArgs(args, properties, isDlq);
return args;
}
private void additionalArgs(Map<String, Object> args, RabbitCommonProperties properties, boolean isDlq) {
...
Integer ttl = isDlq ? properties.getDlqTtl() : properties.getTtl();
if (maxLengthBytes != null) {
args.put("x-max-length-bytes", maxLengthBytes);
}
if (ttl != null) {
args.put("x-message-ttl", ttl);
}
...
}
On Kafka binder side, developer can define the following similar properties to create topic with options:
spring.cloud.stream.kafka.bindings.output.producer.topic.properties.cleanup.policy=delete
spring.cloud.stream.kafka.bindings.output.producer.topic.properties.retention.ms=6000
spring.cloud.stream.kafka.bindings.output.producer.topic.properties.retention.bytes=1024*1024
spring.cloud.stream.kafka.bindings.output.consumer.topic.properties.cleanup.policy=delete
spring.cloud.stream.kafka.bindings.output.consumer.topic.properties.retention.ms=6000
spring.cloud.stream.kafka.bindings.output.consumer.topic.properties.retention.bytes=1024*1024
On Kafka broker side, it corresponds to the topic level configuration:
- The
cleanup.policy
is used to designate the retention policy to use on log segments. - The
retention.ms
configuration is used to keep a log file before deleting it(only cleanup.policy=delete). - The
retention.bytes
configuration is ued to limit the size of the log before deleting it(only cleanup.policy=delete).
See convertion in source code KafkaTopicProvisioner#provisionProducerDestination:
@Override
public ProducerDestination provisionProducerDestination(final String name,
ExtendedProducerProperties<KafkaProducerProperties> properties) {
try (AdminClient adminClient = createAdminClient()) {
createTopic(adminClient, name, properties.getPartitionCount(), false,
properties.getExtension().getTopic());
...
return new KafkaProducerDestination(name, partitions);
}
}
private void createTopic(AdminClient adminClient, String name, int partitionCount,
boolean tolerateLowerPartitionsOnBroker, KafkaTopicProperties properties) {
try {
createTopicIfNecessary(adminClient, name, partitionCount,
tolerateLowerPartitionsOnBroker, properties);
}catch (Throwable throwable) {
}
}
private void createTopicIfNecessary(AdminClient adminClient, final String topicName,
final int partitionCount, boolean tolerateLowerPartitionsOnBroker,
KafkaTopicProperties properties) throws Throwable {
if (this.configurationProperties.isAutoCreateTopics()) {
createTopicAndPartitions(adminClient, topicName, partitionCount,
tolerateLowerPartitionsOnBroker, properties);
}
}
private void createTopicAndPartitions(AdminClient adminClient, final String topicName,
final int partitionCount, boolean tolerateLowerPartitionsOnBroker,
KafkaTopicProperties topicProperties) throws Throwable {
ListTopicsResult listTopicsResult = adminClient.listTopics();
KafkaFuture<Set<String>> namesFutures = listTopicsResult.names();
Set<String> names = namesFutures.get(this.operationTimeout, TimeUnit.SECONDS);
if (names.contains(topicName)) {
}
else {
// always consider minPartitionCount for topic creation
final int effectivePartitionCount = Math.max(
this.configurationProperties.getMinPartitionCount(), partitionCount);
this.metadataRetryOperations.execute((context) -> {
NewTopic newTopic;
Map<Integer, List<Integer>> replicasAssignments = topicProperties
.getReplicasAssignments();
if (replicasAssignments != null && replicasAssignments.size() > 0) {
newTopic = new NewTopic(topicName,
topicProperties.getReplicasAssignments());
}
else {
newTopic = new NewTopic(topicName, effectivePartitionCount,
topicProperties.getReplicationFactor() != null
? topicProperties.getReplicationFactor()
: this.configurationProperties
.getReplicationFactor());
}
if (topicProperties.getProperties().size() > 0) {
newTopic.configs(topicProperties.getProperties());
}
CreateTopicsResult createTopicsResult = adminClient
.createTopics(Collections.singletonList(newTopic));
try {
createTopicsResult.all().get(this.operationTimeout, TimeUnit.SECONDS);
}
catch (Exception ex) {
}
return null;
});
}
}
We have the same properties structure like RabbitMQ binder and Kafka binder, but we need to think about the following questions in other binders:
- In channel namespace, do the topic or queue creation options need to be defined in a single properties namespace?
- Since some common properties are inherited in Spring Message, Spring Integration, and Spring Cloud Stream implementations, what are the side effects of properties in non Spring Cloud Stream modules?
- How to name these two properties?
The following is a personal answer:
-
Q1: RabbitMQ binder does not use a single namespace, and Kafka binder uses a single namespace(Topic namespace).
-
Q2: From a quick check, seems neither RabbitMQ nor Kafka have such an inheritance relationship for the properties structure that is adopted by Spring Cloud Azure.
-
Q3: Regarding the option to create a topic or Queue, the naming of each channel namespace is based on the Server side property key.
Definition in MS learn:
Resource manager API definition for Service Bus:
- Topic.DefinitionStages.WithDefaultMessageTTL#withDefaultMessageTTL
- Topic.DefinitionStages.WithSize#withSizeInMB
Administration model API definition for Service Bus:
public final class CreateTopicOptions {
private Duration defaultMessageTimeToLive;
private long maxMessageSizeInKilobytes;
private long maxSizeInMegabytes;
...
public CreateTopicOptions(TopicProperties topic) {
Objects.requireNonNull(topic, "'topic' cannot be null.");
this.defaultMessageTimeToLive = topic.getDefaultMessageTimeToLive();
this.maxSizeInMegabytes = topic.getMaxSizeInMegabytes();
this.maxMessageSizeInKilobytes = topic.getMaxMessageSizeInKilobytes();
}
public CreateTopicOptions setDefaultMessageTimeToLive(Duration defaultMessageTimeToLive) {
this.defaultMessageTimeToLive = defaultMessageTimeToLive;
return this;
}
public CreateTopicOptions setMaxSizeInMegabytes(long maxSizeInMegabytes) {
this.maxSizeInMegabytes = maxSizeInMegabytes;
return this;
}
public CreateTopicOptions setMaxMessageSizeInKilobytes(long maxMessageSizeInKilobytes) {
this.maxMessageSizeInKilobytes = maxMessageSizeInKilobytes;
return this;
}
}
Azure CLI usage for Service Bus queue or topic creation, they all use the same parameters --default-message-time-to-live
, --max-message-size-in-kilobytes
and --max-size-in-megabytes
.
So it would be more accurate to only put the topic or queue creation options in Service Bus channel namespace, it will not affect the non Spring Cloud Stream modules first, leaving room for modification.
And we can follow the same naming principle, just keep the same name as the Azure SDK client:
DefaultMessageTimeToLive
for default time to live property.MaxSizeInMegabytes
for max size property.
The configs is not only we mentioned the two properties, but also includes other less commonly used attributes. Such as other properties definied in class CreateTopicOptions
or CreateQueueOptions
.
The other configs mean the configs are not supported in current properties configration or supported in the future, the implementation of binder needs to have a mechanism to allow users to extend for provisioning these configs.
The Kafka binder provides a map topic.properties
to store less used properties, not all supported attributes are listed in class KafkaTopicProperties
.
Since Kafka Binder uses the Kafka admin module to create topic, it depends on Spring Kafka properties and Kafka binder properties, it also provides extentision to consumize the Kafka admin client properties. Before using the Kafka admin client, it allows you update the admin client properties.
public interface AdminClientConfigCustomizer {
void configure(Map<String, Object> adminClientProperties);
}
Provide a bean AdminClientConfigCustomizer
, then the KafkaTopicProvisioner
takes it.
The Rabbit binder provides two map queueBindingArguments
and dlqBindingArguments
to store less used properties, I guess that not all supported attributes are listed in class RabbitCommonProperties
.
Before creating the exchange or queue, it allows you update the Exchange
or Queue
enstance directly.
public interface DeclarableCustomizer extends Function<Declarable, Declarable> {
}
Provide the beans DeclarableCustomizer
, then the RabbitExchangeQueueProvisioner
takes them.
Since the Service Bus binder depends on the Azure Resource Manager client, we can expose the resource manager interface to extend the properties update. For topic creation, the interface Topic.Definition
and Queue.Definition
are exposed to custom the topic creation options, so it can be used to apply the customizers before executing topic creating.
These customizer beans will be taken by com.azure.spring.cloud.stream.binder.servicebus.provisioning.ServiceBusChannelResourceManagerProvisioner
.
public interface AzureServiceBusTopicCustomizer {
void customize(Topic.Definition definition);
}
The interface Topic.Definition provide all the topic options that Service Bus resource manager SDK supported configs.
For queue creation, it's the same process in Service Bus resource manager client.
public interface AzureServiceBusQueueCustomizer {
void customize(Queue.Definition definition);
}
The interface Queue.Definition provide all the queue options that Service Bus resource manager SDK supported configs.
RabbitMQ:
- https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream-binder-rabbit.html#rabbit-prod-props
- https://www.rabbitmq.com/ttl.html#overview
- https://www.rabbitmq.com/maxlength.html
Kafka:
What's the
MaxSizeInMegabytes
in Service Bus? Any reference to these two properties? How they are called in Azure Portal, Azure CLI, Azure SDK admin client?