线程池源码分析

发布于 2023-05-22  336 次阅读


线程池源码分析

Java构建线程的方式(常识)

  • 继承Thread
  • 实现Runnable
  • 实现Callable
  • 线程池
    • Executors:提供的线程池,参数都写死了,不好控制~~~
    • 手动构建(ThreadPoolExecutor):阿里规范要求必须手动构建线程池,更好的管理线程池中的参数,并且需要给线程设置名称,方便后期排查问题。

线程池的7个参数(常识)

public ThreadPoolExecutor(int corePoolSize, // 核心线程数 
                          int maximumPoolSize, // 最大线程数 
                          long keepAliveTime, // 最大线程数的空闲时间 
                          TimeUnit unit, // 活跃时间的单位 
                          BlockingQueue<Runnable> workQueue, // 阻塞/工作队列 
                          ThreadFactory threadFactory, // 线程工厂(构建Thread对象) 
                          RejectedExecutionHandler handler) {} // 拒绝策

线程池的执行流程(常识)

image.png

线程池属性标识&线程池状态

线程池状态

image.png

核心参数

// ctl是一个int类型,int类型占用32个bit位,而低29位,用来描述工作线程的个数! 
// 高3位,用来表示线程池的状态 
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); 
private static final int COUNT_BITS = Integer.SIZE - 3; // 29 
// 工作线程的最大值!!!00011111 11111111 11111111 11111111 
private static final int CAPACITY = (1 << COUNT_BITS) - 1; 

00011111 11111111 11111111 11111111 
RUNNING:线程池正常干活,没毛病~~~ 
SHUTDOWN:线程池执行了shutdown()方法 
STOP:线程池执行了shutdownNow()方法 
TIDYING:过渡状态,线程池快凉凉了。 
TERMINATED:真的凉凉了~~执行了terminated()方法,只会在TIDYING状态下执行 

private static final int RUNNING = -1 << COUNT_BITS; //111 表示RUNNING状态 
private static final int SHUTDOWN = 0 << COUNT_BITS; //000 表示SHUTDOWN状态 
private static final int STOP = 1 << COUNT_BITS; //001 表示STOP 
private static final int TIDYING = 2 << COUNT_BITS; //010 表示TIDYING 
private static final int TERMINATED = 3 << COUNT_BITS; //011 表示TERMINATED 
// 获取高三位的值,得到线程池的状态 
private static int runStateOf(int c) { return c & ~CAPACITY; } 
// 获取低29位的值,得到线程池中工作线程的个数 
private static int workerCountOf(int c) { return c & CAPACITY; } 
// 如果你对位运算,&^~|不太了解的,看一下雪花算法(分布式id生成方式),对位运算很了解

线程池的execute方法执行

// 查看execute方法,向线程池提交任务时,会执行execute
public void execute(Runnable command) {、
    // 健壮性判断,任务为null,扔异常!
    if (command == null)
        throw new NullPointerException();

    // 获取到ctl的值!
    int c = ctl.get();

    // 如果工作线程数小于核心线程数  5 < 10
    if (workerCountOf(c) < corePoolSize) {
        // 添加核心线程,执行任务,传入true,代表构建核心线程~
        if (addWorker(command, true))
            return;
        // 因为并发操作,可能会失败如果添加核心线程失败,重新获取ctl,走下面的业务
        c = ctl.get();
    }

    // 判断线程池是不是RUNNING,将任务扔到工作队列中,如果成功
    if (isRunning(c) && workQueue.offer(command)) {
        // 任务已经扔到队列中了!再次拿到ctl
        int recheck = ctl.get();
        // 判断线程池状态不是RUNNING,就移除掉当前任务
        if (!isRunning(recheck) && remove(command))
            // 拒绝策略
            reject(command);
        else if (workerCountOf(recheck) == 0)  // 到这,说明任务添加到工作队列了,但是工作线程数为0
            // 添加一个空任务的非核心线程,去处理队列中的任务!
            addWorker(null, false);

    // 如果任务扔工作队列失败,添加非核心线程
    } else if (!addWorker(command, false))
        // 如果失败,执行reject拒绝策略~ (默认提供了4中拒绝策略、)抛出异常Abort,调用者执行的Caller,
        // 丢掉队列前面的任务,再次执行的DiscardOldest,直接扔当前任务的Discard
        reject(command);
}

// 核心线程也会自动关闭吗
/**
 * 可以给核心线程设置允许被干掉,他会使用非核心线程的空闲时间
 * If false (default), core threads stay alive even when idle.
 * If true, core threads use keepAliveTime to time out waiting
 * for work.
 */
private volatile boolean allowCoreThreadTimeOut;

Worker的操作

private boolean addWorker(Runnable firstTask, boolean core) {
    // --------------------------------判断能否添加工作线程---------------------------------------
    // 加个标记
    retry:
    for (;;) {
        // 获取ctl
        int c = ctl.get();
        // 获取线程池状态
        int rs = runStateOf(c);

        // rs >= SHUTDOWN,说明线程池状态不是RUNNING
        if (rs >= SHUTDOWN &&
            // 线程池状态为SHUTDOWN,并且任务为空,并且工作队列不为空!
            // 说明现在要添加一个空任务的非核心线程,去处理工作队列中的任务
            !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty())
            )
            // 不是RUNNING,并且当前不是要添加非核心空任务线程处理队列~~
            return false;

        // 循环,判断线程池中工作线程的个数
        for (;;) {
            // 拿到线程池中工作线程的个数
            int wc = workerCountOf(c);
            // 当前工作线程,是否超过了最大工作线程数
            if (wc >= CAPACITY ||
                // 如果添加核心线程,不能超过核心线程数最大值
                // 如果添加非核心线程,不能超过非核心线程数最大值
                wc >= (core ? corePoolSize : maximumPoolSize))
                // 超过了,就凉凉~~~,添加工作线程失败!
                return false;
            // 用CAS的方式,将工作线程数 + 1
            if (compareAndIncrementWorkerCount(c))
                // 如果CAS成功,跳出外层循环
                break retry;
            // 再次获取ctl
            c = ctl.get();  
            // 如果重新获取ctl的状态和之前的有变化
            if (runStateOf(c) != rs)
                // 从外层循环重新判断
                continue retry;
        }
    }

    // --------------------------------构建工作线程,并执行任务---------------------------------------
    // 两个标识
    boolean workerStarted = false;
    boolean workerAdded = false;
    // 声明工作线程
    Worker w = null;
    try {
        // 创建工作线程,传入任务
        w = new Worker(firstTask);
        // 拿到线程
        final Thread t = w.thread;
        if (t != null) {
            // 加锁
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // 再次获取线程池状态
                int rs = runStateOf(ctl.get());
                // 线程池状态为RUNNING
                if (rs < SHUTDOWN ||
                    // 如果是SHUTDOWN,并且任务为空
                    (rs == SHUTDOWN && firstTask == null)) {
                    // 将创建好的Worker添加到workers中
                    workers.add(w);
                    // 获取当前工作线程数
                    int s = workers.size();
                    // 如果当前工作线程数,已经大于之前记录的最大值,就替换
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    // 工作线程添加成功
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                // 如果工作线程添加成功,这边就是运行
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            // 工作线程启动失败!,从HashSet里移除工作线程,并尝试terminated
            addWorkerFailed(w);
    }
    return workerStarted;
}
知道的越多 不知道的越多
最后更新于 2023-05-22