Created
May 19, 2022 13:54
-
-
Save zachelrath/dca3ffcb169e1e962bcebff62f50cc0c to your computer and use it in GitHub Desktop.
Simple pulsar message listener
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@Service | |
@Slf4j | |
@ConditionalOnProperty(name = "consumer.enabled", havingValue = "true") | |
public class SimpleMessageListener implements MessageListener<byte[]> { | |
private final int maxConsumeDelaySeconds; | |
private static final String CONSUMER_PREFIX = "(CONSUMER) [{}]: "; | |
public SimpleMessageListener( | |
@Value("${consumer.max.consume.delay.seconds}") int maxConsumeDelaySeconds) { | |
this.maxConsumeDelaySeconds = maxConsumeDelaySeconds; | |
} | |
@Override | |
public void received(Consumer<byte[]> consumer, Message<byte[]> msg) { | |
log.info(CONSUMER_PREFIX + "Received, beginning to process ...", msg.getMessageId()); | |
try { | |
// Sleep for a random delay to simulate delay in message consumption/processing | |
TimeUnit.SECONDS.sleep((int) Math.ceil(Math.random() * maxConsumeDelaySeconds)); | |
consumer.acknowledge(msg); | |
log.info(CONSUMER_PREFIX + "Successfully processed!", msg.getMessageId()); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
log.error("(CONSUMER) THREAD INTERRUPTED --- DID NOT FINISH PROCESSING!"); | |
} catch (PulsarClientException e) { | |
log.error("(CONSUMER) Pulsar client exception", e); | |
consumer.negativeAcknowledge(msg); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment