Java Concurrency and Executors

1. Introduction

Java provides a package java.util.concurrent which includes many facilities to ease concurrent programming. In this article, we take a look at java threads and Executors – what they are and how they work, etc.

2. Runnable Task as a Lambda

Since the early days of Java 1.0, Java provides a Runnable interface which must be implemented to run a task in a separate thread. An sample implementation of a Runnable task is shown below. Note that rather than creating a class implementing the Runnable interface, the code creates a Runnable lambda. The task is then executed in the main thread as well as a second thread.

static private void example_1(String[] args) throws Exception
{
    Runnable task = () -> {
	String threadName = Thread.currentThread().getName();
	System.out.println("Hello " + threadName);
    };

    task.run();

    Thread thread = new Thread(task);
    thread.start();
    System.out.println("Done.");
}

The output is shown below. Note that the main thread completes before the second thread but waits for it to complete before the program terminates.

Hello main
Done.
Hello Thread-0

3. Creating an Executor

With the new Executor Framework in Java 8, java provides an Executor interface with a single method execute(Runnable) for executing a Runnable task. As opposed to creating a Thread with a Runnable (as shown above), you can create an Executor and run one or more tasks as follows:

Executor executor = ...;
executor.execute(task1);
executor.execute(task2);

The advantage of this approach is that the Executor implementation takes care of thread creation and management. To this end, java provides several implementations of this interface supporting various techniques such as executing tasks in a thread pool, executing tasks sequentially in a worker thread, etc.

In addition to the Executor interface, Java also provides an ExecutorService which is a sub-interface of Executor. This interface provides additional facilities over Executor including convenience methods to execute multiple tasks, submit tasks for execution, etc.

Executors is a convenience class providing factory methods to create various kinds of ExecutorService implementation objects. The method newSingleThreadExecutor() creates a thread and executes pending tasks sequentially. A task is submitted for execution using the submit() method.

static private void example_2(String[] args) throws Exception
{
    ExecutorService esvc = Executors.newSingleThreadExecutor();
    Runnable task = () -> {
	try {
	    String threadName = Thread.currentThread().getName();
	    System.out.println("Thread " + threadName + " started");
	    TimeUnit.SECONDS.sleep(2);
	    System.out.println("Thread " + threadName + " ended");
	} catch(InterruptedException ex) {
	    System.err.println("Task interrupted.");
	}
    };

    esvc.submit(task);
}

On running this example, the ExecutorService executes the submitted task and continues to wait for more tasks without exiting. This is the default for all ExecutorService objects. Hit Control-C to halt the program.

4. Serial Execution of Tasks

In addition to executing a Runnable, an ExecutorService can also run a Callable<?>. The following class implements Callable<String> which returns a Future<String> after the execution is completed. It can be scheduled for execution using an ExecutorService.

static private class DelayTask implements Callable<String>
{
    private String name;
    private int secsDelay;

    public DelayTask(String name,int secsDelay) {
	this.name = name;
	this.secsDelay = secsDelay;
    }

    @Override
	public String call() {
	System.out.println(name + " started");
	try { TimeUnit.SECONDS.sleep(secsDelay); }
	catch(InterruptedException ex) {
	    System.err.println(name + ": interrupted");
	}
	System.out.println(name + " ended");
	return name;
    }
}

Let us create a bunch of these tasks and execute them using an ExecutorService created from newSingleThreadExecutor(). This ExecutorService executes the tasks sequentially. At the end, we add another task to shutdown the ExecutorService. Due to Future<?>.get(), the code waits for all the tasks to terminate and then cleanly shuts down the ExecutorService which cleans up all the resources.

static private void example_3(String[] args) throws Exception
{
    ExecutorService esvc = Executors.newSingleThreadExecutor();
    List<Callable<String>> tasks =
	Arrays.asList(new DelayTask("task 1", 2),
		      new DelayTask("task 2", 3),
		      new DelayTask("task 3", 1),
		      () -> {
			  esvc.shutdown();
			  return "shutdown";
		      });
    esvc.invokeAll(tasks)
	.stream()
	.map(future -> {
		try { return future.get(); }
		catch(Exception ex) {
		    return "exception: " + ex.getMessage();
		}
	    })
	.forEach(System.out::println);
}

Here is the output from the code above. Notice that tasks are executed one after another followed by the shutdown task. This is because we used the newSingleThreadExecutor() method which creates an ExecutorService with that characteristic.

task 1 started
task 1 ended
task 2 started
task 2 ended
task 3 started
task 3 ended
task 1
task 2
task 3
shutdown

5. Executing Tasks with a Thread Pool

Let us now examine how a thread pool returned from newCachedThreadPool() behaves. The source code is shown below; it uses the DelayTask class defined above.

static private void example_4(String[] args) throws Exception
{
    ExecutorService esvc = Executors.newCachedThreadPool();
    List<Callable<String>> tasks =
	Arrays.asList(new DelayTask("task 1", 2),
		      new DelayTask("task 2", 3),
		      new DelayTask("task 3", 10),
		      () -> {
			  System.err.println("Requesting shutdown ..");
			  esvc.shutdown();
			  return "shutdown";
		      });
    esvc.invokeAll(tasks)
	.stream()
	.map(future -> {
		try { return future.get(); }
		catch(Exception ex) {
		    return "exception: " + ex.getMessage();
		}
	    })
	.forEach(System.out::println);
}

Here is the output from this code. Notice that a shutdown is requested of the ExecutorService after the tasks have been scheduled. The ExecutorService allows the tasks to complete execution before shutting down. It does not, however, allow any more tasks to be scheduled after shutdown() has been invoked.

Task task 1 started
Task task 2 started
Task task 3 started
Requesting shutdown ..
Task task 1 ended
Task task 2 ended
Task task 3 ended
task 1
task 2
task 3
shutdown

6. Shutdown Thread Pool Immediately

When it is necessary to shutdown the thread pool immediately as opposed to waiting for all tasks to complete, we can use the shutdownNow() method.

static private void example_5(String[] args) throws Exception
{
    ExecutorService esvc = Executors.newCachedThreadPool();
    List<Callable<String>> tasks =
	Arrays.asList(new DelayTask("task 1", 2),
		      new DelayTask("task 2", 3),
		      new DelayTask("task 3", 10),
		      () -> {
			  System.err.println("Requesting shutdown ..");
			  esvc.shutdownNow();
			  return "shutdown";
		      });
    esvc.invokeAll(tasks)
	.stream()
	.map(future -> {
		try { return future.get(); }
		catch(Exception ex) {
		    return "exception: " + ex.getMessage();
		}
	    })
	.forEach(System.out::println);
}

As the output below shows, the tasks that have not yet been completed are interrupted and the thread pool is terminated.

Task task 1 started
Task task 2 started
Task task 3 started
Requesting shutdown ..
task 3: interrupted
Task task 3 ended
task 1: interrupted
Task task 1 ended
task 2: interrupted
Task task 2 ended
task 1
task 2
task 3
shutdown

Summary

This article provided an introduction to threading in Java as well as the new Executor framework in Java 8. The Executor framework simplifies a lot of plumbing code related to thread creation and management. It also brings new capabilities to threading in Java including thread pools, worker threads which can schedule tasks sequentially. As the code above demonstrated, it is also possible to implement code to shutdown the thread pool cleanly after all the tasks have completed.

Leave a Reply

Your email address will not be published. Required fields are marked *