Skip to content

Instantly share code, notes, and snippets.

@sterlp
Last active May 16, 2024 11:34
Show Gist options
  • Save sterlp/e82a9d6030ced76c4ab82c7be5a300b3 to your computer and use it in GitHub Desktop.
Save sterlp/e82a9d6030ced76c4ab82c7be5a300b3 to your computer and use it in GitHub Desktop.
Read topic message from an Azure ServiceBus
@Test
void testServiceBusReceiverClient() throws Exception {
String connectionString = "xxxx";
String topicName = "xxxxx";
String subName = "xxxx";
final ServiceBusReceiverClient client = new ServiceBusClientBuilder()
.connectionString(connectionString)
.receiver()
.topicName(topicName)
.subscriptionName(subName)
.buildClient();
try (client) {
final var messages = client.peekMessages(10);
messages.forEach(msg -> {
System.err.println(msg.getBody());
});
} finally {
client.close();
}
}
/**
* Alternative using ServiceBusProcessorClient
* https://learn.microsoft.com/en-us/azure/service-bus-messaging/service-bus-java-how-to-use-queues?tabs=connection-string
*/
@RequiredArgsConstructor
@Slf4j
static class AzureServiceBusListener {
private final String connectionString;
private final String topicName;
private final String subName;
private final CountDownLatch countDownLatch = new CountDownLatch(1);
private final CompletableFuture<ServiceBusReceivedMessage> result = new CompletableFuture<>();
public ServiceBusReceivedMessage listen(Duration duration) throws InterruptedException, ExecutionException, TimeoutException {
ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder()
.connectionString(connectionString)
.processor().topicName(topicName)
.subscriptionName(subName)
.processMessage(this::processMessage)
.processError(this::processError)
.buildProcessorClient();
log.debug("Starting EventListener for={}", subName);
processorClient.start();
try {
countDownLatch.await(duration.toMillis(), TimeUnit.MILLISECONDS);
} catch (Exception e) {
System.err.println("Error while waiting for message " + e);
result.completeExceptionally(e);
} finally {
processorClient.close();
}
log.debug("Stopped EventListener for={}", subName);
return result.get(1, TimeUnit.MILLISECONDS);
}
private void processError(ServiceBusErrorContext context) {
result.completeExceptionally(context.getException());
countDownLatch.countDown();
}
private void processMessage(ServiceBusReceivedMessageContext context) {
result.complete(context.getMessage());
context.complete();
countDownLatch.countDown();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment