Skip to content

Instantly share code, notes, and snippets.

@drmalex07
Created May 8, 2017 10:28
Show Gist options
  • Save drmalex07/26a04e6dc4eb69dd75308aef8d962c3c to your computer and use it in GitHub Desktop.
Save drmalex07/26a04e6dc4eb69dd75308aef8d962c3c to your computer and use it in GitHub Desktop.
Use a simple blocking queue in Java. #java #producer-consumer #queue #blocking-queue
import static java.lang.System.out;
import static java.lang.System.err;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class HelloBlockingQueue {
private static final Random random = new Random();
/**
* Represent a incoming message (business-specific!)
*/
public static class Message
{
private final int identifier;
private final String text;
public Message(int identifier, String text)
{
this.identifier = identifier;
this.text = text;
}
public int getIdentifier()
{
return identifier;
}
public String getText()
{
return text;
}
@Override
public String toString()
{
return String.format("Message [id=%d, '%s']", identifier, text);
}
}
/**
* A consumer that handles incoming messages from a queue.
*/
public static class MessageHandler implements Runnable
{
private final BlockingQueue<Message> queue;
public MessageHandler(BlockingQueue<Message> queue)
{
this.queue = queue;
}
@Override
public void run()
{
// Consume messages from queue until interrupted
boolean interrupted = false;
while (!interrupted) {
try {
Message message = queue.take();
handle(message);
} catch (InterruptedException ex) {
err.println("The message handler was interrupred: " +
ex.getMessage());
interrupted = true;
}
}
}
private void handle(Message message)
{
// Process a new message
out.println(Thread.currentThread().getName() + " >> " + message.toString());
}
}
/**
* A producer that fetches messages from an external queue (e.g a broker).
*/
public static class MessageProducer implements Runnable
{
private final BlockingQueue<Message> queue;
public MessageProducer(BlockingQueue<Message> queue)
{
this.queue = queue;
}
@Override
public void run()
{
// Fetch messages from external source
boolean interrupted = false, full = false;
int n = 0;
while (!interrupted && !full) {
Message message = null;
// Simulate events coming from external source
int r = 500 + 1000 * random.nextInt(5); // a random delay (millis)
try {
Thread.sleep(r);
message = new Message(++n, "Hello World");
} catch (InterruptedException ex) {
err.println("The message producer was interrupred: " +
ex.getMessage());
message = null;
interrupted = true; // stops further processing
}
// If a new message arrived, add it to queue
if (message != null) {
try {
queue.add(message);
} catch (IllegalStateException ex) {
err.println("Too many messages! " + ex.getMessage());
full = true; // stops further processing
}
}
}
}
}
private static final int QUEUE_SIZE = 1000;
public static void main(String[] args) throws Exception
{
ExecutorService executor = Executors.newFixedThreadPool(2);
BlockingQueue<Message> queue = new ArrayBlockingQueue<>(QUEUE_SIZE);
// Spawn producer/consumer threads
executor.submit(new MessageProducer(queue));
executor.submit(new MessageHandler(queue));
out.println("Processing messages...");
// Wait for graceful termination
executor.shutdown(); // do not receive more tasks
// Note: executor.awaitTermination if we want main() to block here
// waiting for tasks to complete
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment