Java Blocking Queue

1. Introduction

Java provides the BlockingQueue, a Queue which allows waiting for an element to become available for retrieval. It also allows waiting for a slot to become available when storing elements. A BlockingQueue implementation is thread-safe so one thread can attempt to store an element while a different thread can attempt to remove an element.

2. Creating a BlockingQueue

The ArrayBlockingQueue is an implementation of a bounded BlockingQueue backed by an array. The size of the backing array can be specified while creating the BlockingQueue. This refers to the number of elements that can be stored in the queue before the next put() operation results in the thread being put to sleep. Once this happens, elements must be removed from the BlockingQueue before storing any more.

BlockingQueue<String> queue = new ArrayBlockingQueue<>(2);

3. Worker Thread Pool for Processing

One possible use of a BlockingQueue is as an application worker pool. In this scenario, some threads post tasks onto the queue and other threads can pick up the tasks and complete it. We will use an ExecutorService to setup the thread pool.

ExecutorService esvc = Executors.newFixedThreadPool(2);

4. Creating Producers and Consumers

A Producer is an object which produces a task to be executed. The task details are added to the BoundedQueue and is picked up by a Consumer when it is ready to process the task. Using a BoundedQueue for a scenario like this (the producer-consumer problem) means the producer goes to sleep if the queue is full and the consumer sleeps if the queue is empty.

The following is a producer task formulated as a lambda. It tries to store a task (“apple”) into the queue declared above. After the put() succeeds, it returns a status.

Callable<String> producer = () -> {
    String s = "apple";
    queue.put(s);
    return "put -> " + s;
}

And here is the consumer task which attempts to retrieve the next task and process it. Once the processing is done, it returns a status.

Callable<String> consumer = () -> {
    String s = queue.take();
    // consume task here
    return "take -> " + s;
}

In the code below, we declare an array of tasks and submit it to the thread pool. In your code, you do not have to add producers and consumers together as shown.

List<Callable<String>> elements =
    Arrays.asList(() -> {
            String s = "apple";
            queue.put(s);
            return "put -> " + s;
        },
        () -> {
            String s = "orange";
            queue.put(s);
            return "put -> " + s;
        },
        () -> {
            String s = queue.take();
            return "take -> " + s;
        },
        () -> {
            String s = "banana";
            queue.put(s);
            return "put -> " + s;
        },
        () -> {
            String s = queue.take();
            return "take -> " + s;
        },
        () -> {
            String s = queue.take();
            return "take -> " + s;
        });

5. Adding a Shutdown Task

We also add a “shutdown” task at the end to shutdown the thread pool properly after all the tasks complete.

Callable<String> shutdownTask = () -> {
            esvc.shutdown();
            return "shutdown";
        });

Let us now submit the tasks for execution. At this point, the thread pool executes kicks off the tasks using the available threads (2 threads as shown above).

esvc.invokeAll(elements);

6. Waiting for Completion

The invokeAll() method returns a list of Future<String> which we can process using the streams facility to wait for the tasks to complete. The method Future<String>.get() waits for completion of each task.

esvc.invokeAll(elements)
    .forEach(f -> {
            try {
                System.out.println(f.get());
            } catch(Exception ex) {
                System.out.println("get failed: " + ex.getMessage());
            }
        });

The output is shown below. The producer tasks insert the task into the queue and the consumer tasks retrieve the task and process it. Finally the shutdown task closes the thread pool.

put -> apple
put -> orange
take -> apple
put -> banana
take -> banana
take -> orange
shutdown

7. Deadlock

A deadlock might occur in the above case if we use the newSingleThreadExecutor() method which executes the tasks in sequence. If the BoundingQueue is created with less capacity, the producers might be put to sleep waiting for storage. If the consumers fail to clear the queue fast enough, the producers might be left waiting for a longer time. It is important to keep track of such issues and work around them. However, using newFixedThreadPool() to create the ExecutorService spawns more threads and runs the consumers so the producers have space for storage.

Summary

In this article, we saw how to use the BoundedQueue to create a producer-consumer system. In such a system, producers create tasks which are completed in another thread by consumers. Invoking Future<?>.get() waits for the tasks to complete and deliver the status.

Leave a Reply

Your email address will not be published. Required fields are marked *