Skip to content

Instantly share code, notes, and snippets.

@sumanmaity112
Created October 22, 2021 15:19
Show Gist options
  • Save sumanmaity112/2b57985a57c7b3f86dad90b590c66d8e to your computer and use it in GitHub Desktop.
Save sumanmaity112/2b57985a57c7b3f86dad90b590c66d8e to your computer and use it in GitHub Desktop.
Manual acknowledgement with sqs listener
package com.suman.demo.listener;
import com.suman.demo.Processor;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.aws.messaging.listener.Acknowledgment;
import org.springframework.cloud.aws.messaging.listener.SqsMessageDeletionPolicy;
import org.springframework.cloud.aws.messaging.listener.annotation.SqsListener;
import org.springframework.stereotype.Component;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Mono;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
@Slf4j
@Component
@RequiredArgsConstructor
public class EventListener {
private final Processor processor;
@SqsListener(value = "${queue.url}", deletionPolicy = SqsMessageDeletionPolicy.NEVER)
public void receiveMessage(Event event, Acknowledgment acknowledgment) {
Mono.just(event)
.doOnNext(message -> log.info("Incoming message {}", message))
.flatMap(processor::process)
.flatMap(
unused ->
Mono.fromFuture(CompletableFuture.supplyAsync(acknowledgment::acknowledge)))
.timeout(Duration.ofSeconds(25))
.subscribe(
new BaseSubscriber<Future<?>>() {
@Override
protected void hookOnError(Throwable throwable) {
log.error("Failed to process event", throwable);
this.onError(throwable);
}
@Override
protected void hookOnComplete() {
log.info("Successfully processed event");
this.onComplete();
}
});
}
}
package com.suman.demo.config;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.aws.messaging.config.QueueMessageHandlerFactory;
import org.springframework.cloud.aws.messaging.config.annotation.EnableSqs;
import org.springframework.cloud.aws.messaging.listener.support.AcknowledgmentHandlerMethodArgumentResolver;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.messaging.handler.annotation.support.PayloadMethodArgumentResolver;
import org.springframework.validation.beanvalidation.LocalValidatorFactoryBean;
@Configuration
@EnableSqs
@Slf4j
public class SqsConfig {
@Bean
public QueueMessageHandlerFactory queueMessageHandlerFactory(
ObjectMapper objectMapper, LocalValidatorFactoryBean validator) {
final QueueMessageHandlerFactory factory = new QueueMessageHandlerFactory();
final MappingJackson2MessageConverter messageConverter = new MappingJackson2MessageConverter();
messageConverter.setObjectMapper(objectMapper);
messageConverter.setStrictContentTypeMatch(false);
final AcknowledgmentHandlerMethodArgumentResolver acknowledgmentResolver =
new AcknowledgmentHandlerMethodArgumentResolver("Acknowledgment");
// Here Order matters for argument resolvers
factory.setArgumentResolvers(
List.of(
acknowledgmentResolver,
new PayloadMethodArgumentResolver(messageConverter, validator)));
return factory;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment