Skip to content

Instantly share code, notes, and snippets.

@muhdkhokhar
Created March 16, 2025 19:24
Show Gist options
  • Save muhdkhokhar/76435a2265d157062cc9809cf0b6fc8f to your computer and use it in GitHub Desktop.
Save muhdkhokhar/76435a2265d157062cc9809cf0b6fc8f to your computer and use it in GitHub Desktop.
import java.util.concurrent.*;
public class InternalQueue {
private final String queueName;
private final BlockingQueue<Message> queue = new LinkedBlockingQueue<>();
private final ExecutorService executorService;
private final MessageListener listener;
private final int concurrentConsumers;
public InternalQueue(String queueName, MessageListener listener, int concurrentConsumers) {
this.queueName = queueName;
this.listener = listener;
this.concurrentConsumers = concurrentConsumers;
this.executor = Executors.newFixedThreadPool(concurrentConsumers);
startConsumers();
}
private final MessageListener listener;
private final ExecutorService executor;
public void send(Message message) {
queue.offer(message);
}
private final BlockingQueue<Message> queue = new LinkedBlockingQueue<>();
private void startConsumers() {
for (int i = 0; i < concurrentConsumers; i++) {
executor.execute(() -> {
while (!Thread.currentThread().isInterrupted()) {
try {
Message msg = queue.take();
listener.onMessage(msg);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (Exception e) {
System.err.println("Listener processing failed: " + e.getMessage());
}
}
});
});
}
public void send(Message message) {
queue.offer(message);
}
public void stop() {
executor.shutdownNow();
}
public int getQueueSize() {
return queue.size();
}
public String getQueueName() {
return queueName;
}
private final ExecutorService executor;
}
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class QueueManager {
private final Map<String, InternalQueue> queues = new ConcurrentHashMap<>();
public void createQueue(String queueName, MessageListener listener, int concurrentConsumers) {
queues.computeIfAbsent(queueName, name ->
new InternalQueue(name, listener, concurrentConsumers)
);
}
public InternalQueue getQueue(String queueName) {
return queues.get(queueName);
}
public void sendMessage(String queueName, Message message) {
InternalQueue queue = queues.get(queueName);
if (queue != null) {
queue.queue.offer(message);
} else {
throw new IllegalArgumentException("Queue not found: " + queueName);
}
}
public void shutdownAll() {
queues.values().forEach(queue -> queue.executor.shutdownNow());
}
private final Map<String, InternalQueue> queues = new ConcurrentHashMap<>();
}
public class Main {
public static void main(String[] args) throws InterruptedException {
QueueManager queueManager = new QueueManager();
// Create a queue "orderQueue" with 3 concurrent consumers and custom listener
queueManager.queues.put("orderQueue",
new InternalQueue("orderQueue",
message -> {
System.out.println("[OrderQueue Listener] Thread: " + Thread.currentThread().getName() +
" processing message: " + message.getPayload());
// simulate processing time
try { Thread.sleep(1000); } catch (InterruptedException ignored) {}
}, 3));
// Create another queue "notificationQueue" with 2 concurrent consumers and a different listener
queueManager.queues.put("notificationQueue",
new InternalQueue("notificationQueue",
message -> {
System.out.println("Notification Received: " + message.getPayload());
}, 2));
// Sending messages dynamically
queueManager.sendMessage("orderQueue", new Message("Order #123"));
queueManager.sendMessage("orderQueue", new Message("Order #124"));
queueManager.sendMessage("notificationQueue", new Message("New user signed up!"));
queueManager.sendMessage("notificationQueue", new Message("System maintenance at midnight"));
// Allow processing for demonstration purposes
Thread.sleep(5000);
System.out.println("Shutting down queues...");
queueManager.queues.values().forEach(q -> q.executor.shutdownNow());
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment