In-depth analysis: Java's concurrent programming thread pool technology, this is no longer a panic after reading the interview!

Posted Jun 16, 202016 min read

Benefits of thread pools

The thread pool in Java is the most used concurrency framework, and almost all programs that need to execute tasks asynchronously or concurrently can use the thread pool. In the development process, the rational use of the thread pool, compared to single-threaded serial processing(Serial Processing) and the allocation of a new thread for each task(One Task One New Thread) can bring three benefits.

  1. Reduce resource consumption. Reduce the consumption caused by thread creation and destruction by reusing already created threads.
  2. Improve response speed. When the task arrives, the task can be executed immediately without waiting for the thread to be created.
  3. Improve thread manageability. Threads are scarce resources. If created unlimitedly, it will not only consume system resources, but also reduce the stability of the system. The use of thread pools can perform unified allocation, tuning, and monitoring. However, in order to make proper use of the thread pool, you must be well aware of its implementation principles.

The realization principle of thread pool

All the following descriptions are based on the JDK 1.8 source code.

Architecture Design

The core implementation class of the thread pool in Java is ThreadPoolExecutor. The design of this class inherits the AbstractExecutorService abstract class and implements the ExecutorService and Executor interfaces. The relationship is roughly as shown in the following figure:

The four interfaces and classes will be introduced one by one from top to bottom.

Executor

The top-level interface Executor provides a method to decouple task submission and execution mechanism of each task(including the details of thread usage and thread scheduling, etc.). Using Executor can avoid creating threads explicitly. For example, for a series of tasks, you may use the following method instead of new Thread(new(RunnableTask())).start():

Executor executor = anExecutor;
executor.execute(new RunnableTask1());
executor.execute(new RunnableTask2());

The Executor interface provides an interface method to perform a specified task at a certain time in the future. Assigned tasks

  1. May be executed by a newly created thread;
  2. May be executed by an idle thread in a thread pool;
  3. It may also be executed by the calling thread of the method.

These possible implementation methods depend on the design or implementation of the Executor interface implementation class.

public interface Executor {
    void execute(Runnable command);
}

Serial Processing

In fact, the Executor interface does not strictly require the execution of threads to be asynchronous. The easiest way to implement the interface is to execute all tasks with the thread that calls the method.

class DirectExecutor implements Executor {
   public void execute(Runnable r) {
     r.run();
   }
}

This is actually the Serial Processing method mentioned above. Suppose, we now implement a server application that responds to requests in this way. Well, although this way of implementation is theoretically correct.

  1. But its performance is very poor, because it can only respond to one request at a time. If there are a large number of requests, only serial responses can be made.
  2. At the same time, if the server response logic contains file I/O or database operations, the server needs to wait for these operations to complete before continuing. If the blocking time is too long at this time, the server resource utilization rate is very low. In this way, during the waiting process, the server CPU will be in an idle state.

In summary, this method of Serial Processing will have inability to quickly respond to problems and low throughput rate problems.

One Task One New Thread

However, a more typical implementation is that the task is executed by some other thread rather than the thread of the method call. For example, the following Executor implementation method is to create a new thread for each task to execute.

class ThreadPerTaskExecutor implements Executor {
   public void execute(Runnable r) {
     new Thread(r).start();
   }
}

This method is actually the One Task One New Thread method mentioned above, and this method of infinitely creating threads also has many problems.

  1. The overhead of the thread life cycle is very high. If a large number of tasks need to be executed, then a large number of threads need to be created. This will cause the creation and destruction of the thread life cycle overhead is very large.
  2. Resource consumption. Active threads consume system resources, especially memory. If there are already enough threads to keep all CPUs busy, then creating more threads will reduce performance. The simplest example is a 4-core CPU machine that creates 100 threads for 100 tasks to execute.
  3. Stability. There is a limit on the number of threads that can be created. This limit is affected by factors such as JVM startup parameters, stack size, and thread restrictions on the underlying operating system. If this limit is exceeded, an OutOfMemoryError may be thrown.

ExecutorService

The ExecutorService interface inherits from the Executor interface and adds some interface methods. Can interfaces also be inherited? I didn't pay attention before, but now I learned. Here introduces the semantics of interface inheritance**:

  1. The interface Executor has execute(Runnable) method, and the interface ExecutorService inherits Executor, and does not need to duplicate Executor's method. Just need to write your own method(business).
  2. When a class ThreadPoolExecutor wants to implement the ExecutorService interface, you need to implement the methods of the two interfaces ExecutorService and Executor.

ExecutorService roughly adds 2 types of interface methods:

  1. ExecutorService shutdown method. For the thread pool implementation, the specific implementation of these methods is in ThreadPoolExecutor.
  2. Expand the method of asynchronous task execution. For the thread pool implementation, these methods are all template methods implemented in the AbstractExecutorService abstract class.

AbstractExecutorService

The abstract class AbstractExecutorService provides the implementation of various submit asynchronous execution methods in the ExecutorService interface class. Compared with Executor.execute(Runnable), these methods all have return values. At the same time, the final implementation of these methods is to call the execute(Runnable) method implemented in the ThreadPoolExecutor class.

Although the submit method can provide the return value of the thread execution, only the implementation of Callable will have a return value, and the return value of Runnable is null.

    public Future<?> submit(Runnable task) {
        if(task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }

    public <T> Future<T> submit(Runnable task, T result) {
        if(task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
    }

    public <T> Future<T> submit(Callable<T> task) {
        if(task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }

In addition, this abstract class also has the implementation of the invokeAny and invokeAll methods in the ExecutorService interface class. Here is just a brief introduction to the semantics of these two methods.

invokeAny

  1. invokeAny() receives a collection containing Callable objects as parameters. Calling this method does not return a Future object, but returns the running result of a Callable object in the collection.
  2. This method cannot guarantee which Callable is returned after the call, only knows that it is a Callable object whose execution ends in these Callables.

invokeAll

  1. invokeAll accepts a collection containing Callable objects as parameters. Calling this method will return a list of Future objects, corresponding to the running result of the set of input Callable objects.
  2. Here, there is a corresponding relationship between the submitted task container list and the returned Future list**.

ThreadPoolExecutor

execute(Runnable) method

How the thread pool performs input tasks, the core logic of the entire thread pool implementation, we start learning from this method. The code is as follows:

    public void execute(Runnable command) {
        if(command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if(workerCountOf(c) <corePoolSize) {
            if(addWorker(command, true))
                return;
            c = ctl.get();
        }
        if(isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if(! isRunning(recheck) && remove(command))
                reject(command);
            else if(workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if(!addWorker(command, false))
            reject(command);
    }

It can be found that when a new task is submitted to the thread pool, the processing flow of the thread pool is as follows:

  1. Determine whether the number of working threads in the thread pool is less than the number of core threads(corePoolSize). If it is, create a new worker thread to perform the task(need to acquire the global lock). Otherwise, go to the next process.
  2. Determine whether the work queue(BlockingQeue) of the thread pool is full. If it is not full, store the newly added task in the work queue. Otherwise, go to the next process.
  3. Determine whether the number of working threads in the thread pool is less than the maximum number of threads(maximumPoolSize). If it is less, create a new worker thread to execute the task(need to acquire the global lock).
  4. If greater than or equal to, then hand over to the saturation strategy to handle this task.

Flow chart of newly submitted task

To illustrate with a flowchart, the process of the thread pool processing a newly submitted task is shown in the following figure:

ThreadPoolExecutor execution diagram

From the above, we can find that the thread pool has four kinds of processing possibilities for a new task, corresponding to the four steps of the above processing flow.

ThreadPoolExecutor adopts the overall design idea of the above steps in order to avoid acquiring global locks as far as possible when executing execute() method(that would be a serious scalable bottleneck). After ThreadPoolExecutorfinishes warming up(the number of currently running threads is greater than or equal to corePoolSize), almost all execute() method calls are performed in step 2, and step 2 does not need to acquire a global lock.

Worker thread

From the code of execute(Runnable) above, we can find that when the thread pool creates a thread, the thread is encapsulated into a worker worker. After the worker executes the task, it will also obtain the task in the work queue for execution.

The schematic diagram of the thread execution task in ThreadPoolExecutor is as follows:

There are two situations in which threads in the thread pool perform tasks:

  1. When a thread is created in the execute() method, this thread will be allowed to execute the current task.
  2. After this thread executes the task in Figure 1 above, it will repeatedly obtain tasks from BlockingQueue to execute.

ctl variable of ThreadPoolExecutor

ctl is an AtomicInteger class. The updates of saved int variables are atomic operations to ensure thread safety. Its first 3 digits are used to indicate the thread pool status, and the last 29 digits are used to indicate the number of engineering threads**.

Status of ThreadPoolExecutor

There are five states of the thread pool:

  1. Running:When the thread pool is in the Running state, can receive new tasks and process added tasks. The initial state of the thread pool is RUNNING. In other words, once the thread pool is created, it is in the Running state**, and the number of tasks in the thread pool is 0.

    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

  2. Shutdown:When the thread pool is in the SHUTDOWN state, does not receive new tasks, but can handle the tasks that have been added(running and in the BlockingQueue). When calling the shutdown() interface of the thread pool, the thread pool is changed from RUNNING -> SHUTDOWN.

  3. Stop:When the thread pool is in the STOP state, do not receive new tasks, do not process added tasks, and will interrupt running tasks. When calling the shutdownNow() interface of the thread pool, the thread pool consists of(RUNNING or SHUTDOWN) -> STOP.

  4. Tidying:When all tasks have been terminated, the "number of tasks" recorded by ctl is 0, and the thread pool will become Tidying state. When the thread pool becomes Tidying state, the hook function terminated() will be executed. terminated() is empty in the ThreadPoolExecutor class. If the user wants to perform corresponding processing when the thread pool becomes Tidying, it can be achieved by overloading the terminated() function.

  5. Terminated:The thread pool is completely terminated, it becomes Terminated. When the thread pool is in Tidying state, after terminating() is executed, it will be Tidying -> Terminated.

Use of thread pool

Thread pool creation

We can create a thread pool through the constructor of ThreadPoolExecutor.

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)
  1. corePoolSize(the number of core threads in the thread pool):the number of threads to be kept in the thread pool, even if they are idle, they will not stop. When submitting a task to the thread pool, the thread pool will create a thread to perform the task, even if other idle basic threads can perform new tasks, the thread will be created, and will not be created when the number of tasks to be performed is greater than the basic size of the thread pool Create again**. If the thread pool's prestartAllCoreThreads() method is called, the thread pool will create and start all basic threads in advance.
  2. maximumPoolSize(the maximum number of threads in the thread pool):the maximum number of threads allowed in the thread pool. If the queue is full, and the number of created threads is less than the maximum number of threads, the thread pool will create new threads to perform tasks. It is worth noting that if the parameter unbounded task queue is used, it has no effect.
  3. keepAliveTime(Thread activity retention time):When the number of threads in the thread pool is greater than corePoolSize, keepAliveTime is the longest remaining time for extra idle threads to wait for new tasks. Therefore, if there are many tasks and the execution time of each task is relatively short, you can increase the time and improve the thread utilization.
  4. unit(unit of thread activity retention time):Optional units are days(DAYS), hours(HOURS), minutes(MINUTES), milliseconds(MILLISECONDS), microseconds(MICROSECONDS, one thousandth of a millisecond) and nano Seconds(NANOSECONDS, one thousandth of a microsecond).
  5. runnableTaskQueue(task queue):used to block the queue waiting for tasks to be executed. The following blocking queues can be selected.
  • ArrayBlockingQueue:It is a bounded blocking queue based on the array structure. This queue sorts the elements according to the FIFO(First In First Out) principle.
  • LinkedBlockingQueue:An unbounded blocking queue based on a linked list structure. This queue sorts elements by FIFO, and the throughput is usually higher than ArrayBlockingQueue. The static factory method Executors.newFixedThreadPool() uses this queue.
  • SynchronousQueue:A blocking queue that does not store elements. Each insert operation must wait until another thread calls the remove operation, otherwise the insert operation is always blocked, and the throughput is usually higher than Linked-BlockingQueue. The static factory method Executors.newCachedThreadPool uses this queue.
  • PriorityBlockingQueue:An infinite blocking queue with priority.
  1. ThreadFactory:Used to set the factory for creating threads, you can set a more meaningful name for each created thread through the thread factory.

  2. RejectedExecutionHandler(saturation strategy):When the ThreadPoolExecutor has been closed or the ThreadPoolExecutor has been saturated(the maximum thread pool size is reached and the work queue is full), the execute() method will be called by the Handler, then you must adopt one of the strategies to handle the submission New mission**. This policy is AbortPolicy by default. The Java thread pool framework provides the following 4 strategies:

    • AbortPolicy:throw an exception directly
    • CallerRunsPolicy:Only use the thread of the caller to run the task
    • DiscardOldestPolicy:discard the oldest task in the queue and execute the current task
    • DiscardPolicy:not processed, discarded

Commonly used ThreadPoolExecutor

The following three types of ThreadPoolExecutor can be created through the Executors tool class of the Executor framework. Through the source code, we can find that the essence of these three kinds of thread pools are ThreadPoolExecutor with different input parameter configurations.

FixedThreadPool

FixedThreadPool is called a thread pool that can reuse a fixed number of threads. The following is the source code implementation of FixedThreadPool.

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }

Noticed,

  1. The corePoolSize and maximumPoolSize of FixedThreadPool are both set to the same specified parameter nThreads at the time of creation.
  2. The task blocking queue uses the unbounded queue new LinkedBlockingQueue().
  3. KeepAliveTime is set to 0.
  4. The default values used by both ThreadFactory and RejectedExecutionHandler.

The schematic diagram of the execute() method of FixedThreadPool is as follows:

Its operation instructions:

  1. If the number of currently running threads is less than corePoolSize, create a new thread to execute the task.
  2. After the thread pool has finished warming up(the number of currently running threads is equal to corePoolSize), add the task to LinkedBlockingQueue.
  3. After the thread executes the task in 1, it will repeatedly obtain the task from the LinkedBlockingQueue in the loop to execute.

FixedThreadPool uses the unbounded queue LinkedBlockingQueue as the work queue of the thread pool(the capacity of the queue is Integer.MAX_VALUE) will have the following impact on the thread pool:

  1. When the number of threads in the thread pool reaches corePoolSize, the new task will wait in the unbounded queue. Since the unbounded queue will never be full, the number of threads in the thread pool will not exceed corePoolSize.
  2. Due to 1, maximumPoolSize will be an invalid parameter when using an unbounded queue.
  3. Due to 1 and 2, keepAliveTime will be an invalid parameter when using unbounded queues. There will be no more threads than corePoolSize.
  4. Due to the use of unbounded queues. FixedThreadPool(not executing method shutdown() or shutdownNow()) will not reject the task(the RejectedExecutionHandler.rejectedExecution method will not be called).
SingleThreadExecutor

SingleThreadExecutor is an Executor that uses a single worker thread. SingleThreadExecutor is similar to FixedThreadPool except that its corePoolSize and maximumPoolSize are set to 1. The following is the source code implementation of SingleThreadExecutor.

    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
           (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
CachedThreadPool

CachedThreadPool is a thread pool that creates new threads as needed. Below is the source code for creating CachedThread-Pool.

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

Notes:

  1. The corePoolSize of CachedThreadPool is set to 0, that is, the corePool is empty; the maximumPoolSize is set to Integer.MAX_VALUE, that is, the maximumPool is unbounded.
  2. KeepAliveTime is set to 60L, which means that the maximum time for idle threads in CachedThreadPool to wait for new tasks is 60 seconds, and idle threads will be terminated after more than 60 seconds.
  3. CachedThreadPool uses SynchronousQueue with no capacity as the work queue of the thread pool, but the maximumPool of CachedThreadPool is unbounded. This means that if the main thread submits tasks faster than the threads in the maximumPool process tasks, CachedThreadPool will continue to create new threads. In extreme cases, CachedThreadPool will exhaust CPU and memory resources because it creates too many threads.

The execution process of the execute() method of CacheThreadPool is shown below:

The description of its implementation process is as follows:

  1. First execute SynchronousQueue.offer(Runnable task). If there is an idle thread in the maximumPool currently executing SynchronousQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS), then the main thread performs the offer operation and the idle thread performs the polling operation. The main thread hands the task to the idle thread for execution; otherwise, the following Step 2.
  2. When the initial maximumPool is empty, or there is no idle thread currently in the maximumPool, no thread will execute SynchronousQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS). In this case, CachedThreadPool will create a new thread to perform the task.
  3. After the newly created thread in step 2 executes the task, it will execute SynchronousQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS). This poll operation causes idle threads to wait at most 60 seconds in SynchronousQueue. If the main thread submits a new task within 60 seconds(the main thread performs step 1), then the idle thread will execute the new task submitted by the main thread; otherwise, the idle thread will terminate. Since idle threads that are idle for 60 seconds are terminated, the CachedThreadPool that remains idle for a long time will not use any resources.

Submit tasks to the thread pool

You can use two methods to submit tasks to the thread pool, respectively execute() and submit() method.

  1. The execute() method is used to submit tasks that do not require a return value, so it is impossible to determine whether the task was successfully executed by the thread pool. The task input by the execute() method is an instance of the Runnable class.
  2. The submit() method is used to submit tasks that require a return value. The thread pool will return an object of type future, through this future object you can determine whether the task is successfully executed, and you can get the return value by the future get() method, get() method will block the current thread until The task is completed, and using the get(long timeout, TimeUnit unit) method will block the current thread and return immediately after a period of time. At this time, the task may not be completed.

Close thread pool

You can close the thread pool by calling the thread pool's shutdown or shutdownNow method. Their principle is to traverse the worker threads in the thread pool, and then call the interrupt method of the thread one by one to interrupt the thread, so tasks that cannot respond to the interrupt may never be terminated. But there are certain differences.

  1. Shutdown first sets the state of the thread pool to SHUTDOWN. Then block newly submitted tasks, for newly submitted tasks, if the status is not RUNNING, rejectedExecution is thrown. For committed(running and in the task queue) task will not have any effect. At the same time, those idle threads(idleWorkers) will be interrupted**.
  2. shutdownNow first sets the state of the thread pool to STOP. Then block the newly submitted task. For the newly submitted task, if it is tested that the status is not RUNNING, then thrown rejectedExecution and interrupt the currently running thread. In addition, it also removes the tasks in the BolckingQueue and adds these tasks to the list to return.

Thread pool monitoring

The parameters provided by the thread pool can be used for monitoring. The following attributes can be used when monitoring the thread pool:

  1. taskCount:The number of tasks to be executed by the thread pool.
  2. completedTaskCount:the number of tasks that the thread pool has completed in the running process, which is less than or equal to taskCount.
  3. largestPoolSize:The maximum number of threads ever created in the thread pool. Through this data, you can know whether the thread pool has been full. If the value is equal to the maximum size of the thread pool, it means that the thread pool was once full.
  4. getPoolSize:the number of threads in the thread pool. If the thread pool is not destroyed, the threads in the thread pool will not be automatically destroyed, so this size only increases.
  5. getActiveCount:Get the number of active threads.

In addition, monitoring by extending the thread pool. You can customize the thread pool by inheriting the thread pool, rewrite the beforeExecute, afterExecute, and terminated methods of the thread pool, or you can execute some code before the task execution, after execution, and before the thread pool is closed to monitor. For example, the average execution time, maximum execution time, and minimum execution time of monitoring tasks. These methods are empty methods in the thread pool.