认识线程池

image-1672469941354

为了避免频繁的创建和销毁线程,带来的性能影响;我们一般在使用线程的时候,把线程资源池化,通过采用线程池的方式来使用多线程。

java 本身提供Executors来帮助我们快速的创建线程池;不过这种方式在阿里的开发规约中不被推荐,我们只做简单的介绍。

//创建单线程的线程池,只有一个工作线程来执行任务;超出的任务会按照指定的顺序执行。
Executors.newSingleThreadExecutor();

//创建一个固定大小的线程池
Executors.newFixedThreadPool(5);

//创建一个可缓冲的线程池
Executors.newCachedThreadPool();

//创建一个固定大小,可以执行周期性任务的线程池
Executors.newScheduledThreadPool(5);

SingleThreadExecutor 和 FixedThreadPool 可能会堆积大量的任务,消耗大量的内存,导致OOM

CachedThreadPool 和 ScheduledThreadPool 最大线程数是Integer.MAX_VALUE,可能创建大量的线程,可能会OOM。

ThreadPoolExecutor 构造函数

ThreadPoolExecutor 构造函数的七个参数解释:

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)
  • corePoolSize 核心线程数

  • maximumPoolSize 最大线程池数

  • keepAliveTime 最大空闲时间

  • TimeUnit 时间单位

  • BlockingQueue 工作队列

  • ThreadFactory 线程工厂

  • RejectedExecutionHandler 拒绝策略

RejectedExecutionHandler 默认拒绝策略

RejectedExecutionHandler 默认有4中拒绝策略,用户也可以根据自己的实际需求去自定义实现

// 拒绝,抛出异常		
ThreadPoolExecutor.AbortPolicy;
// 调用者去执行
ThreadPoolExecutor.CallerRunsPolicy;
//直接丢弃,不做任何处理
ThreadPoolExecutor.DiscardPolicy;
//丢弃最老的任务
ThreadPoolExecutor.DiscardOldestPolicy;
  • AbortPolicy 拒绝策略,抛出异常

  • CallerRunsPolicy 调用者执行策略,去执行任务

  • DiscardPolicy 丢弃策略,不做任何处理

  • DiscardOldestPolicy 丢弃最早策略

线程池的使用

public static void main(String[] args) {
    // 线程池的核心线程数如何设置
    // 任务可以分为两种:CPU密集,IO密集。
    ThreadPoolExecutor executor = new ThreadPoolExecutor(
            1,
            2,
            1,
            TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(1),
            new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    Thread t = new Thread(r);
                    // ...
                    return t;
                }
            },
            new ThreadPoolExecutor.AbortPolicy()
    );

    executor.execute(任务);
    executor.submit(有返回结果的任务);
}

调用时有两种方式:

  • execute 执行任务

  • submit 执行有返回结果的任务

ThreadPoolExecutor 源码分析

execute 方法 VS submit 方法

   //submit 方法

    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }
    
      protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
    }  

submit 就是通过将Runnable 构建出RunnableFuture ,并调用 execute 实现由返回值的;所以我们线程池执行的过程,研究execute 即可

线程池基础知识

在解读execute 方法之前;我们先了解一些线程池的基础知识。

线程池状态转换

image-1672466049391

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// COUNT_BITS=29
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

// 拿到线程池状态
private static int runStateOf(int c)     { return c & ~CAPACITY; }

// 拿到工作线程个数
private static int workerCountOf(int c)  { return c & CAPACITY; }
  • AtomicInteger ctl 用CAS 实现的int ,保证了并发环境下的原子性;

  • ctl 主要维护了两个状态;用高3位维持线程池状态,用低29位维持线程池工作线程个数(核心线程+非核心线程)。

execute 方法

public void execute(Runnable command) {

if (command == null)
        throw new NullPointerException();
int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
       if (addWorker(command, true))
          return;
       c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
    int recheck = ctl.get();
    if (! isRunning(recheck) && remove(command))
        reject(command);
    else if (workerCountOf(recheck) == 0)
        addWorker(null, false);
}
    else if (!addWorker(command, false))
reject(command);
}

execute 方法描述:

  1. 检查任务是否为空,如果为空则抛出NullPointerException

  2. 如果不为空,获取ctl 的值;判断工作线程个数是否小于核心线程数

  3. 如果小于核心线程数,则调用addWorker 添加核心线程数,添加成功就return

  4. 添加失败,重新获取ctl;判断是不是运行状态(即小于shutdown 状态),正常的化,执行添加任务到工作队列

  5. 再次判断工作线程状态,如果不是RUNNING 就移除任务 并执行拒绝策略

  6. 判断工作线程数是不是为0,如果为0;则添加空任务和非核心线程(因为 核心线程也可以通过keepAlived超时被销毁,所以如果恰巧核心线程被销毁,也会出现当前效果)

  7. 如果不为0,就添加任务非核心线程执行;添加失败就执行拒绝策略。

addWorker方法

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                        firstTask == null &&
                        ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    ThreadPoolExecutor.Worker w = null;
    try {
        w = new ThreadPoolExecutor.Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

// 如果添加工作线程失败,执行
private void addWorkerFailed(Worker w) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // 说明worker可能存放到了workers的hashSet中。
        if (w != null)
            // 移除!
            workers.remove(w);
        // 减掉workerCount的数值 -1
        decrementWorkerCount();
        // 尝试干掉自己
        tryTerminate();
    } finally {
        mainLock.unlock();
    }
}

addWorker 方法描述:

  1. 获取ctl,获取运行状态runState

  2. 如果运行状态大于SHUTDOWN ,并且不等于SHUTDOWN状态(状态图描述过,SHUTDOWN 状态还可以继续执行完工作队列的任务,大于SHUTDOWN的不可以) 就返回false 添加失败

  3. 计算工作线程的个数,如果工作线程个数大于CAPACITY 或者 添加核心线程已大于等于设置核心线程数 或者 添加非核心线程大于等于线程池设定的max 数;就返回false 添加失败

  4. 否则可以添加,就执行CAS添加,添加成功就调出双层循环

  5. 添加失败就重新获取运行状态,如果运行状态改变也跳出双重循环。

  6. 创建工作线程,将任务传到Worker中

  7. 获取锁资源并加锁,加锁的目的是为了方式线程池的状态出现变化

  8. 判断线程池状态是不是SHUTDOWN 并且为空任务,如果线程启动就抛出异常

  9. 否则就把worker 添加到队列中;如果woker数量大于曾经最大的数量,就更新记录

  10. 如果woker 添加成功,就启动线程。如果添加失败就把自己干掉。(回滚)

woker 的run 方法

addWoker 的过程中,添加成功后,状态也正常会执行woker 的 run 方法;run 方法会调用 runWoker 。

 public void run() {
  runWorker(this);
 }
 
 final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

runWoker 方法描述:(时已经进入到工作线程的执行流程)

  1. 获取task ,如果task 为空,就去队列中take 一个任务

  2. 加锁,并判断线程池状态是否大于等于STOP,如果是要中断当前线程

  3. 否则就实际执行,执行前后有预留的 beforeExecute 和 afterExecute 可以自定义去实现

  4. 任务执行完就更新任务完成数量+1,解锁

  5. 执行processWorkerExit

getTask 方法

工作线程排队拿任务

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // Are workers subject to culling?
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

getTask 方法描述

  1. 如果线程池状态为SHUTDOWN && 工作队列为空;工作线程数-1

  2. 获取核心线程数,判断核心线程是否允许超时 或者 线程数是否大于核心线程数

  3. 如果成立就执行CAS 减少核心线程数

  4. 如果是非核心,走poll,拉取工作队列任务,

  5. 如果是核心线程,走take一直阻塞,拉取工作队列任务

  6. 获取到任务以后返回。

take 流程补充:

当工作队列没有任务时,这时就会被Condition通过await阻塞线程

当有任务添加到工作线程后,这是添加完任务后,就会用过Condition.signal唤醒阻塞的线程

processWorkerExit 方法

工作线程告辞~

private void processWorkerExit(ThreadPoolExecutor.Worker w, boolean completedAbruptly) {
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        completedTaskCount += w.completedTasks;
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }

    tryTerminate();

    int c = ctl.get();
    if (runStateLessThan(c, STOP)) {
        if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        addWorker(null, false);
    }
}

processWorkerExit 方法描述

  1. 如果是不正常操作,需要先对工作线程数-- (如果正常情况,getTask就–了)

  2. 将当前工作线程完整的任务个数赋值给整个线程池中的任务数;然后remove woker

  3. 线程池是否可以中止,线程池状态是否发生变化。

  4. 判断队列中是否还有任务,判断工作线程是否还在

  5. 如果还有任务为完成,则继续添加非核心线程去处理任务。