ThreadPoolExecutor

Posted Jun 26, 20204 min read

ThreadPoolExecutor

Creation of ThreadPoolExecutor

ThreadPoolExecutor provides 4 kinds of construction methods, taking the most parameter as an example

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)
Serial number Parameter name Type Meaning
1 corePoolSize int Number of core threads
2 maximumPoolSize int Maximum number of threads
3 keepAliveTime long Thread maximum idle time
4 unit TimeUnit Free time unit
5 workQueue BlockingQueue WorkQueue
6 threadFactory ThreadFactory Thread Factory
7 handler RejectedExecutionHandler Rejection Strategy
workQueue

Move BlockingQueue

threadFactory

New threads are created through the specified ThreadFactory. If not specified, the default factory Executors.defaultThreadFactory is used, and the created threads will all belong to the same ThreadGroup.

DefaultThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group =(s != null)? s.getThreadGroup():
                                  Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" +
                          poolNumber.getAndIncrement() +
                         "-thread-";
        }

All threads have the same NORM_PRIORITY priority and non-daemon status.

public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                                  namePrefix + threadNumber.getAndIncrement(),
                                  0);
            if(t.isDaemon())
                t.setDaemon(false);
            if(t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }

Through the specified ThreadFactory, you can control the thread name, thread group, priority, daemon status, etc.

handler

ThreadPoolExecutor.AbortPolicy throws java.util.concurrent.RejectedExecutionException

The ThreadPoolExecutor.CallerRunsPolicy thread calls execute to run the task itself to run the rejected task; if the executor is closed, the task will be discarded

ThreadPoolExecutor.DiscardPolicy will discard rejected tasks by default

ThreadPoolExecutor.DiscardOldestPolicy If the executor has not been closed, the task at the head of the work queue will be deleted, and then retry the executor

ThreadPoolExecutor workflow

Workflow.png

When a task is added to the thread pool through the execute(Runnable) method

If the number in the thread pool is less than corePoolSize at this time, even if the threads in the thread pool are idle, a new thread must be created to handle the added task.
If the number of thread pools at this time is equal to corePoolSize, but the buffer queue workQueue is not full, then the task is put into the buffer queue.
If the number in the thread pool is greater than corePoolSize, the buffer queue workQueue is full, and the number in the thread pool is less than maximumPoolSize, create a new thread to handle the added task.
If the number in the thread pool is greater than corePoolSize, the buffer queue workQueue is full, and the number in the thread pool is equal to the maximumPoolSize, then the rejected task must be processed through the strategy specified by the handler.
When the number of threads in the thread pool is greater than corePoolSize, if a thread's idle time exceeds keepAliveTime, the thread will be terminated. In this way, the thread pool can dynamically adjust the number of threads in the pool.
Pre-defined thread pool provided by Executors
newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

corePoolSize is equal to maximumPoolSize, all threads are core threads, and the thread pool size is fixed;
keepAliveTime = 0 This parameter is invalid, because FixedThreadPool is all core threads;
workQueue = LinkedBlockingQueue, the maximum length of the queue is Integer.MAX_VALUE when the size is initialized by default, in fact, there is a memory size to control the actual length of the queue,(JVM thread allocates memory-Xss) If the task submission speed continues to exceed the task processing speed, a large number of threads Blocked in the queue, possibly OOM before rejecting the policy;

Both putLock and takeLock in LinkedBlockingQueue are unfair locks, so the execution of tasks of FixedThreadPool is out of order;

newSingleThreadPool
public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
           (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

Special FixedThreadPool, fixed thread pool size 1, single thread execution

newCachedThreadPool
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

corePoolSize = 0, maximumPoolSize = Integer.MAX_VALUE, the number of execution threads is unlimited(the actual number is limited by the virtual machine memory);
keepAliveTime = 60s, the thread is recycled after 60s of idle time.
workQueue = SynchronousQueue, synchronous queue, this queue must be dequeued and must be passed at the same time, the queue will not store elements, so the CachedThreadPool thread will not actually have a queue waiting;

newScheduledThreadPool
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }

public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE,
              DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
              new DelayedWorkQueue());
    }

DelayedWorkQueue guarantees that the tasks added to the queue will be sorted according to the delay time of the task, and the task with less delay time will be obtained first

newWorkStealingPool
public static ExecutorService newWorkStealingPool(int parallelism) {
        return new ForkJoinPool
           (parallelism,
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }

newWorkStealingPool is a parallel thread pool. The parameter is the number of concurrent threads. Unlike the previous four thread pools, this thread pool does not guarantee the sequential execution of tasks. That is, WorkStealing(work stealing) means preemption. Work will create a thread pool with enough threads to maintain the corresponding level of parallelism. It will make the multi-core CPU not idle by work stealing. There will always be live threads for the CPU to run.