原创

深入理解线程池

温馨提示:
本文最后更新于 2020年05月27日,已超过 1,391 天没有更新。若文章内的图片失效(无法正常加载),请留言反馈或直接联系我

使用线程池的好处

  1. 降低资源消耗
    可以重复利用已创建的线程降低线程创建和销毁造成的消耗。
  2. 提高响应速度
    当任务到达时,任务可以不需要等到线程创建就能立即执行。
  3. 提高线程的可管理性
    线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控。

不使用线程池的坏处

  1. 频繁的线程创建和销毁会占用更多的CPU和内存。
  2. 频繁的线程创建和销毁会对GC产生比较大的压力。
  3. 线程太多,线程切换带来的开销将不可忽视。
  4. 线程太少,多核CPU得不到充分利用,是一种浪费。

线程池的工作原理

当一个新的任务提交到线程池之后:

  1. 线程池判断核心线程池里的线程是否都在执行任务。如果不是,则创建一个新的工作线程来执行任务。如果核心线程池里的线程都在执行任务,则执行第二步。
  2. 线程池判断工作队列是否已经满。如果工作队列没有满,则将新提交的任务存储在这个工作队列里进行等待。如果工作队列满了,则执行第三步。
  3. 线程池判断线程池的线程是否都处于工作状态。如果没有,则创建一个新的工作线程来执行任务。如果已经满了,则交给饱和策略来处理这个任务。
    线程池的工作流程.png

ThreadPoolExecutor的处理流程

ThreadPoolExecutor的处理流程.png

Executors

Executors是一个线程池工厂,提供了很多的工厂方法,我们来看看它大概能创建哪些线程池。

// 创建单一线程的线程池
public static ExecutorService newSingleThreadExecutor();
// 创建固定数量的线程池
public static ExecutorService newFixedThreadPool(int nThreads);
// 创建带缓存的线程池
public static ExecutorService newCachedThreadPool();
// 创建定时调度的线程池
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize);
// 创建流式(fork-join)线程池
public static ExecutorService newWorkStealingPool();

创建单一线程的线程池

故名思意,这个线程池只有一个线程。若多个任务被提交到此线程池,那么会被缓存到队列(队列长度为Integer.MAX_VALUE)。当线程空闲的时候,按照FIFO的方式进行处理。

创建固定数量的线程池

和创建单一线程的线程池类似,只是这儿可以并行处理任务的线程数更多一些罢了。若多个任务被提交到此线程池,会有下面的处理过程。

如果线程的数量未达到指定数量,则创建线程来执行任务
如果线程池的数量达到了指定数量,并且有线程是空闲的,则取出空闲线程执行任务
如果没有线程是空闲的,则将任务缓存到队列(队列长度为Integer.MAX_VALUE)。当线程空闲的时候,按照FIFO的方式进行处理.

创建带缓存的线程池

这种方式创建的线程池,核心线程池的长度为0,线程池最大长度为Integer.MAX_VALUE。由于本身使用SynchronousQueue作为等待队列的缘故,导致往队列里面每插入一个元素,必须等待另一个线程从这个队列删除一个元素。

创建定时调度的线程池

和上面3个工厂方法返回的线程池类型有所不同,它返回的是ScheduledThreadPoolExecutor类型的线程池。平时我们实现定时调度功能的时候,可能更多的是使用第三方类库,比如:quartz等。但是对于更底层的功能,我们仍然需要了解

手动创建线程池

ThreadPoolExecutor构造方法有7个参数.png

ThreadPoolExecutor源码:

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

ThreadPoolExecutor构造方法有7个参数

  1. corePoolSize:线程池中的核心线程数。
  2. maximumPoolSize:线程池中的最大线程数。
  3. keepAliveTime:空闲时间,当线程池数量超过核心线程数时,多余的空闲线程存活的时间,即:这些线程多久被销毁。
  4. unit:空闲时间的单位,可以是毫秒、秒、分钟、小时和天,等等。
  5. workQueue:等待队列,线程池中的线程数超过核心线程数时,任务将放在等待队列,它是一个BlockingQueue类型的对象。
  6. threadFactory:线程工厂,我们可以使用它来创建一个线程。
  7. handler:拒绝策略,当线程池和等待队列都满了之后,需要通过该对象的回调函数进行回调处理。

为什么阿里Java规约禁止使用Java内置Executors创建线程池?

阿里巴巴Java规约中让我们手动创建线程池效果更好哦!
其实可以从ThreadPoolExecutor构造方法的7个参数出发。
规约中的原话:线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。

等待队列-workQueue

等待队列是BlockingQueue类型的,理论上只要是它的子类,都可以用来作为等待队列。

JDK中自带的一些阻塞队列

  1. ArrayBlockingQueue:队列是有界的,基于数组实现的阻塞队列。
  2. LinkedBlockingQueue:队列可以有界,也可以无界。基于链表实现的阻塞队列。
  3. SynchronousQueue:不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作将一直处于阻塞状态。该队列也是Executors.newCachedThreadPool()的默认队列。
  4. PriorityBlockingQueue:带优先级的无界阻塞队列。

通常情况下,我们需要指定阻塞队列的上界(比如1024)。另外,如果执行的任务很多,我们可能需要将任务进行分类,然后将不同分类的任务放到不同的线程池中执行。

线程工厂-threadFactory

ThreadFactory接口

ThreadFactory是一个接口,只有一个方法。
ThreadFactory是一个接口,只有一个方法。.png

Executors的实现使用了默认的线程工厂-DefaultThreadFactory。它的实现主要用于创建一个线程,线程的名字为pool-{poolNum}-thread-{threadNum}。
Executors采用了默认的DefaultThreadFactory线程工厂.png
源代码:

    /**
     * The default thread factory
     */
    static class DefaultThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        DefaultThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                                  Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" +
                          poolNumber.getAndIncrement() +
                         "-thread-";
        }

        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                                  namePrefix + threadNumber.getAndIncrement(),
                                  0);
            if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    }

自定义线程名称就是实现ThreadFactory

/**
 * 带有名称的线程工厂
 * <p>为什么需要定义线程的名称?
 * 因为,如果在线程很多的时候,定义线程的名称有助于我们调试和定位问题。
 */
class MyThreadFactory implements ThreadFactory {

    /**
     * 线程名称
     */
    private final String threadName;

    /**
     * 构造器:传入线程名称,设置线程名称
     */
    MyThreadFactory(String threadName) {
        this.threadName = threadName;
    }

    @Override
    public Thread newThread(Runnable r) {
        Thread t = new Thread(r, threadName);
        t.setDaemon(true);
        return t;
    }
}

拒绝策略/线程池饱和策略-handler

什么是拒绝策略?

就是当线程池满了、队列也满了的时候,我们对任务采取的措施。或者丢弃、或者执行、或者其他…

JDK有哪些拒绝策略?

JDK自带4种拒绝策略

JDK自带4种拒绝策略,分别是:
1.CallerRunsPolicy:在调用者线程执行。
自实现CallerRunsPolicy类似:

/**
 * 线程池拒绝策略:CallerRunsPolicy => CallerRunsPolicy在任务被拒绝添加后,会调用当前线程池的所在的线程去执行被拒绝的任务。
 */
class MyCallerRunsPolicy implements RejectedExecutionHandler {
    public MyCallerRunsPolicy() {
    }

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        if (!executor.isShutdown()) {
            r.run();
        }
    }
}

2.AbortPolicy:直接抛出RejectedExecutionException异常。
自实现AbortPolicy类似:

/**
 * 线程池拒绝策略:AbortPolicy => ThreadPoolExecutor中默认的拒绝策略就是AbortPolicy。直接抛出异常。
 * <p>
 * 很简单粗暴,直接抛出个RejectedExecutionException异常,也不执行这个任务了。
 */
class MyAbortPolicy implements RejectedExecutionHandler {
    public MyAbortPolicy() {
    }

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        throw new RejectedExecutionException("Task " + r.toString() +
                " rejected from " +
                e.toString());
    }
}

3.DiscardPolicy:任务直接丢弃,不做任何处理。

/**
 * 线程池拒绝策略:DiscardPolicy => 啥都不干,对于线程池的任务不抛弃也不会执行。
 */
class MyDiscardPolicy implements RejectedExecutionHandler {
    public MyDiscardPolicy() {
    }

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
    }
}

4.DiscardOldestPolicy:丢弃队列里最旧的那个任务,再尝试执行当前任务。

/**
 * 线程池拒绝策略:DiscardOldestPolicy => 抛弃线程池中老的任务,再把新的任务加进去
 */
class MyDiscardOldestPolicy implements RejectedExecutionHandler {
    public MyDiscardOldestPolicy() {
    }

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        if (!executor.isShutdown()) {
            executor.getQueue().poll();
            executor.execute(r);
        }
    }
}

如何使用?

        // 线程池拒绝策略:DiscardPolicy => 直接丢弃
//        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
        // 自实现DiscardPolicy
//        executor.setRejectedExecutionHandler(new MyDiscardPolicy());

        // 线程池拒绝策略:AbortPolicy => 直接抛异常
//        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
        // 自实现MyAbortPolicy
//        executor.setRejectedExecutionHandler(new MyAbortPolicy());

        // 线程池拒绝策略:CallerRunsPolicy => CallerRunsPolicy在任务被拒绝添加后,会调用当前线程池的所在的线程去执行被拒绝的任务。
//        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        // 自实现MyCallerRunsPolicy
//        executor.setRejectedExecutionHandler(new MyCallerRunsPolicy());

        // 线程池拒绝策略:DiscardOldestPolicy => 对于线程池中的任务不抛弃也不拒绝,啥也不干
//        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
        // 自实现MyDiscardOldestPolicy
//        executor.setRejectedExecutionHandler(new MyDiscardOldestPolicy());

        // 自定义线程池拒绝策略:比如现在想让被拒绝的任务在一个新的线程中执行。
        executor.setRejectedExecutionHandler(new MyRejectedExecutionHandler());

提交任务的两种方式

提及任务的方式有两种,分别是:submit和execute

这两个方法的区别:

  • submit:submit()用于提交一个需要返回果的任务。该方法返回一个Future对象,通过调用这个对象的get()方法,我们就能获得返回结果。get()方法会一直阻塞,直到返回结果返回。另外,我们也可以使用它的重载方法get(long timeout, TimeUnit unit),这个方法也会阻塞,但是在超时时间内仍然没有返回结果时,将抛出异常TimeoutException。
    submit(Runnable task)源代码:
      public Future<?> submit(Runnable task) {
          if (task == null) throw new NullPointerException();
          RunnableFuture<Void> ftask = newTaskFor(task, null);
          execute(ftask);
          return ftask;
      }
    
  • execute:execute()用于提交不需要返回结果的任务。
    execute源代码:
      public void execute(Runnable command) {
          if (command == null)
              throw new NullPointerException();
          int c = ctl.get();
          if (workerCountOf(c) < corePoolSize) {
              if (addWorker(command, true))
                  return;
              c = ctl.get();
          }
          if (isRunning(c) && workQueue.offer(command)) {
              int recheck = ctl.get();
              if (! isRunning(recheck) && remove(command))
                  reject(command);
              else if (workerCountOf(recheck) == 0)
                  addWorker(null, false);
          }
          else if (!addWorker(command, false))
              reject(command);
      }
    

关闭线程池的两种方式

可以调用线程池对象的shutdown()和shutdownNow()方法来关闭线程池。

这两个方法的区别:

  • shutdown()会将线程池状态置为SHUTDOWN,不再接受新的任务,同时会等待线程池中已有的任务执行完成再结束。
    shutdown源代码:
      public void shutdown() {
          final ReentrantLock mainLock = this.mainLock;
          mainLock.lock();
          try {
              // 确保安全关闭
              checkShutdownAccess();
              // 将线程池状态置为SHUTDOWN
              advanceRunState(SHUTDOWN);
              // 不再接受新任务
              interruptIdleWorkers();
              onShutdown(); // hook for ScheduledThreadPoolExecutor
          } finally {
              mainLock.unlock();
          }
          tryTerminate();
      }
    
  • shutdownNow()会将线程池状态置为STOP,对所有线程执行interrupt()操作,清空队列,并将队列中的任务返回回来。
    shutdownNow源代码:
      public List<Runnable> shutdownNow() {
          List<Runnable> tasks;
          final ReentrantLock mainLock = this.mainLock;
          mainLock.lock();
          try {
              // 确保安全关闭
              checkShutdownAccess();
              // 将线程池状态置为STOP
              advanceRunState(STOP);
              // 打断所有线程
              interruptWorkers();
              // 清空队列
              tasks = drainQueue();
          } finally {
              mainLock.unlock();
          }
          tryTerminate();
          // 并将队列中的任务返回回来
          return tasks;
      }
    

另外,关闭线程池涉及到两个返回boolean的方法,isShutdown()和isTerminated,分别表示是否关闭和是否终止。

如何正确配置线程池的参数?

  1. 任务的性质:CPU密集型、IO密集型和混杂型。
  2. 任务的优先级:高中低。
  3. 任务执行的时间:长中短。
  4. 任务的依赖性:是否依赖数据库或者其他系统资源。

通常来说,如果任务属于CPU密集型,那么我们可以将线程池数量设置成CPU的个数,以减少线程切换带来的开销。如果任务属于IO密集型,我们可以将线程池数量设置得更多一些,比如CPU个数*2。

可以通过Runtime.getRuntime().availableProcessors()来获取CPU的个数。

线程池监控

如果系统中大量用到了线程池,那么我们是不是有必要对线程池进行监控。
这样子有助于我们定位出现的问题。

ThreadPoolExecutor自带了一些方法:

  1. long getTaskCount():获取已经执行或正在执行的任务数。
  2. long getCompletedTaskCount():获取已经执行的任务数。
  3. int getLargestPoolSize():获取线程池曾经创建过的最大线程数,根据这个参数,我们可以知道线程池是否满过。
  4. int getPoolSize():获取线程池线程数。
  5. int getActiveCount():获取活跃线程数(正在执行任务的线程数)。

其它:

  1. protected void beforeExecute(Thread t, Runnable r):任务执行之前调用。
  2. protected void afterExecute(Runnable r, Throwable t):任务执行之后调用。
  3. protected void terminated():线程池结束之后调用。

遇到的一个问题

package com.lzhpo.threadpool.demo3;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * 遇到的一个问题
 *
 * @author lzhpo
 */
public class AProblem {

    static class DivTask implements Runnable {
        int a,b;

        public DivTask(int a, int b) {
            this.a = a;
            this.b = b;
        }

        @Override
        public void run() {
            double result = a / b;
            System.out.println(result);
        }
    }

    public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 5; i++) {
            executor.submit(new DivTask(100, i));
        }
    }
}

运行结果:

100.0
25.0
33.0
50.0

疑问:

  1. 我明明第一次的时候除数为0,为什么不报错?
  2. 按理论来说,应该是有5次输出的,为什么只有三次?
    线程池submit的问题.png

解决办法:
对submit的返回值进行处理。
因为submit是一个非阻塞的方法,就是不管你发生什么错误,我都会执行下去。
线程池submit的问题-解决办法.png

所以:

  1. 尽量使用手动的方式创建线程池,避免使用Executors工厂类。
  2. 根据场景,合理设置线程池的各个参数,包括线程池数量、队列、线程工厂和拒绝策略。
  3. 在调线程池submit()方法的时候,一定要尽量避免任务执行异常被吞掉的问题。

示例

HandCreateThreadPoolDemo1

package com.lzhpo.threadpool.demo3;

import java.util.Random;
import java.util.concurrent.*;

/**
 * 手动创建线程池
 *
 * @author lzhpo
 */
public class HandCreateThreadPoolDemo1 {

    public static void main(String[] args) {

        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                1,
                1,
                1,
                TimeUnit.SECONDS,
                // 线程池缓冲队列
                new LinkedBlockingDeque<>(10),
                // 自定义ThreadFactory线程工厂
                new MyThreadFactory("HandCreateThreadPoolDemo1")) {
            @Override
            protected void beforeExecute(Thread t, Runnable r) {
                System.out.println("I'm beforeExecute.");
            }

            @Override
            protected void afterExecute(Runnable r, Throwable t) {
                System.out.println("I'm afterExecute.");
            }

            @Override
            protected void terminated() {
                System.out.println("I'm terminated.");
            }
        };

        /**
         * 线程池拒绝策略
         */
        // 线程池拒绝策略:DiscardPolicy => 直接丢弃
//        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
        // 自实现DiscardPolicy
//        executor.setRejectedExecutionHandler(new MyDiscardPolicy());

        // 线程池拒绝策略:AbortPolicy => 直接抛异常
//        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
        // 自实现MyAbortPolicy
//        executor.setRejectedExecutionHandler(new MyAbortPolicy());

        // 线程池拒绝策略:CallerRunsPolicy => CallerRunsPolicy在任务被拒绝添加后,会调用当前线程池的所在的线程去执行被拒绝的任务。
//        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        // 自实现MyCallerRunsPolicy
//        executor.setRejectedExecutionHandler(new MyCallerRunsPolicy());

        // 线程池拒绝策略:DiscardOldestPolicy => 对于线程池中的任务不抛弃也不拒绝,啥也不干
//        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
        // 自实现MyDiscardOldestPolicy
//        executor.setRejectedExecutionHandler(new MyDiscardOldestPolicy());

        // 自定义线程池拒绝策略:比如现在想让被拒绝的任务在一个新的线程中执行。
        executor.setRejectedExecutionHandler(new MyRejectedExecutionHandler());

        /**
         * 提交任务
         */
        // 方法1:submit,非阻塞方法,有返回结果,也就是Future对象。
        executor.submit(() -> {
            System.out.println("This is a task.");
            System.out.println(Thread.currentThread().getName());
            ;
        });
        // 方法2:execute。没有返回结果。
//        executor.execute(() -> {
//            System.out.println("This is a task.");
//        });

        /**
         * 关闭线程池
         */
        // 方法1:shutdown。shutdown()会将线程池状态置为SHUTDOWN,不再接受新的任务,同时会等待线程池中已有的任务执行完成再结束。
        executor.shutdown();
        // 方法2:立马结束,并且清空任务队列
//        executor.shutdownNow();

    }
}

//--------------------自定义线程名称------------------------

/**
 * 带有名称的线程工厂
 * <p>为什么需要定义线程的名称?
 * 因为,如果在线程很多的时候,定义线程的名称有助于我们调试和定位问题。
 */
class MyThreadFactory implements ThreadFactory {

    /**
     * 线程名称
     */
    private final String threadName;

    /**
     * 构造器:传入线程名称,设置线程名称
     */
    MyThreadFactory(String threadName) {
        this.threadName = threadName;
    }

    @Override
    public Thread newThread(Runnable r) {
        Thread t = new Thread(r, threadName);
        t.setDaemon(true);
        return t;
    }
}

//--------------------线程池拒绝策略------------------------

/**
 * 线程池拒绝策略:AbortPolicy => ThreadPoolExecutor中默认的拒绝策略就是AbortPolicy。直接抛出异常。
 * <p>
 * 很简单粗暴,直接抛出个RejectedExecutionException异常,也不执行这个任务了。
 */
class MyAbortPolicy implements RejectedExecutionHandler {
    public MyAbortPolicy() {
    }

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        throw new RejectedExecutionException("Task " + r.toString() +
                " rejected from " +
                e.toString());
    }
}

/**
 * 线程池拒绝策略:CallerRunsPolicy => CallerRunsPolicy在任务被拒绝添加后,会调用当前线程池的所在的线程去执行被拒绝的任务。
 */
class MyCallerRunsPolicy implements RejectedExecutionHandler {
    public MyCallerRunsPolicy() {
    }

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        if (!executor.isShutdown()) {
            r.run();
        }
    }
}

/**
 * 线程池拒绝策略:DiscardPolicy => 啥都不干,对于线程池的任务不抛弃也不会执行。
 */
class MyDiscardPolicy implements RejectedExecutionHandler {
    public MyDiscardPolicy() {
    }

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
    }
}

/**
 * 线程池拒绝策略:DiscardOldestPolicy => 抛弃线程池中老的任务,再把新的任务加进去
 */
class MyDiscardOldestPolicy implements RejectedExecutionHandler {
    public MyDiscardOldestPolicy() {
    }

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        if (!executor.isShutdown()) {
            executor.getQueue().poll();
            executor.execute(r);
        }
    }
}

/**
 * 自定义线程池拒绝策略:比如现在想让被拒绝的任务在一个新的线程中执行。
 */
class MyRejectedExecutionHandler implements RejectedExecutionHandler {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        new Thread(r, "新线程" + new Random().nextInt(10)).start();
    }
}
本文目录