线程池源码分析
Java构建线程的方式(常识)
- 继承Thread
- 实现Runnable
- 实现Callable
- 线程池
- Executors:提供的线程池,参数都写死了,不好控制~~~
- 手动构建(ThreadPoolExecutor):阿里规范要求必须手动构建线程池,更好的管理线程池中的参数,并且需要给线程设置名称,方便后期排查问题。
线程池的7个参数(常识)
public ThreadPoolExecutor(int corePoolSize, // 核心线程数
int maximumPoolSize, // 最大线程数
long keepAliveTime, // 最大线程数的空闲时间
TimeUnit unit, // 活跃时间的单位
BlockingQueue<Runnable> workQueue, // 阻塞/工作队列
ThreadFactory threadFactory, // 线程工厂(构建Thread对象)
RejectedExecutionHandler handler) {} // 拒绝策
线程池的执行流程(常识)
线程池属性标识&线程池状态
线程池状态
核心参数
// ctl是一个int类型,int类型占用32个bit位,而低29位,用来描述工作线程的个数!
// 高3位,用来表示线程池的状态
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3; // 29
// 工作线程的最大值!!!00011111 11111111 11111111 11111111
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
00011111 11111111 11111111 11111111
RUNNING:线程池正常干活,没毛病~~~
SHUTDOWN:线程池执行了shutdown()方法
STOP:线程池执行了shutdownNow()方法
TIDYING:过渡状态,线程池快凉凉了。
TERMINATED:真的凉凉了~~执行了terminated()方法,只会在TIDYING状态下执行
private static final int RUNNING = -1 << COUNT_BITS; //111 表示RUNNING状态
private static final int SHUTDOWN = 0 << COUNT_BITS; //000 表示SHUTDOWN状态
private static final int STOP = 1 << COUNT_BITS; //001 表示STOP
private static final int TIDYING = 2 << COUNT_BITS; //010 表示TIDYING
private static final int TERMINATED = 3 << COUNT_BITS; //011 表示TERMINATED
// 获取高三位的值,得到线程池的状态
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 获取低29位的值,得到线程池中工作线程的个数
private static int workerCountOf(int c) { return c & CAPACITY; }
// 如果你对位运算,&^~|不太了解的,看一下雪花算法(分布式id生成方式),对位运算很了解
线程池的execute方法执行
// 查看execute方法,向线程池提交任务时,会执行execute
public void execute(Runnable command) {、
// 健壮性判断,任务为null,扔异常!
if (command == null)
throw new NullPointerException();
// 获取到ctl的值!
int c = ctl.get();
// 如果工作线程数小于核心线程数 5 < 10
if (workerCountOf(c) < corePoolSize) {
// 添加核心线程,执行任务,传入true,代表构建核心线程~
if (addWorker(command, true))
return;
// 因为并发操作,可能会失败如果添加核心线程失败,重新获取ctl,走下面的业务
c = ctl.get();
}
// 判断线程池是不是RUNNING,将任务扔到工作队列中,如果成功
if (isRunning(c) && workQueue.offer(command)) {
// 任务已经扔到队列中了!再次拿到ctl
int recheck = ctl.get();
// 判断线程池状态不是RUNNING,就移除掉当前任务
if (!isRunning(recheck) && remove(command))
// 拒绝策略
reject(command);
else if (workerCountOf(recheck) == 0) // 到这,说明任务添加到工作队列了,但是工作线程数为0
// 添加一个空任务的非核心线程,去处理队列中的任务!
addWorker(null, false);
// 如果任务扔工作队列失败,添加非核心线程
} else if (!addWorker(command, false))
// 如果失败,执行reject拒绝策略~ (默认提供了4中拒绝策略、)抛出异常Abort,调用者执行的Caller,
// 丢掉队列前面的任务,再次执行的DiscardOldest,直接扔当前任务的Discard
reject(command);
}
// 核心线程也会自动关闭吗
/**
* 可以给核心线程设置允许被干掉,他会使用非核心线程的空闲时间
* If false (default), core threads stay alive even when idle.
* If true, core threads use keepAliveTime to time out waiting
* for work.
*/
private volatile boolean allowCoreThreadTimeOut;
Worker的操作
private boolean addWorker(Runnable firstTask, boolean core) {
// --------------------------------判断能否添加工作线程---------------------------------------
// 加个标记
retry:
for (;;) {
// 获取ctl
int c = ctl.get();
// 获取线程池状态
int rs = runStateOf(c);
// rs >= SHUTDOWN,说明线程池状态不是RUNNING
if (rs >= SHUTDOWN &&
// 线程池状态为SHUTDOWN,并且任务为空,并且工作队列不为空!
// 说明现在要添加一个空任务的非核心线程,去处理工作队列中的任务
!(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty())
)
// 不是RUNNING,并且当前不是要添加非核心空任务线程处理队列~~
return false;
// 循环,判断线程池中工作线程的个数
for (;;) {
// 拿到线程池中工作线程的个数
int wc = workerCountOf(c);
// 当前工作线程,是否超过了最大工作线程数
if (wc >= CAPACITY ||
// 如果添加核心线程,不能超过核心线程数最大值
// 如果添加非核心线程,不能超过非核心线程数最大值
wc >= (core ? corePoolSize : maximumPoolSize))
// 超过了,就凉凉~~~,添加工作线程失败!
return false;
// 用CAS的方式,将工作线程数 + 1
if (compareAndIncrementWorkerCount(c))
// 如果CAS成功,跳出外层循环
break retry;
// 再次获取ctl
c = ctl.get();
// 如果重新获取ctl的状态和之前的有变化
if (runStateOf(c) != rs)
// 从外层循环重新判断
continue retry;
}
}
// --------------------------------构建工作线程,并执行任务---------------------------------------
// 两个标识
boolean workerStarted = false;
boolean workerAdded = false;
// 声明工作线程
Worker w = null;
try {
// 创建工作线程,传入任务
w = new Worker(firstTask);
// 拿到线程
final Thread t = w.thread;
if (t != null) {
// 加锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 再次获取线程池状态
int rs = runStateOf(ctl.get());
// 线程池状态为RUNNING
if (rs < SHUTDOWN ||
// 如果是SHUTDOWN,并且任务为空
(rs == SHUTDOWN && firstTask == null)) {
// 将创建好的Worker添加到workers中
workers.add(w);
// 获取当前工作线程数
int s = workers.size();
// 如果当前工作线程数,已经大于之前记录的最大值,就替换
if (s > largestPoolSize)
largestPoolSize = s;
// 工作线程添加成功
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 如果工作线程添加成功,这边就是运行
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
// 工作线程启动失败!,从HashSet里移除工作线程,并尝试terminated
addWorkerFailed(w);
}
return workerStarted;
}
Comments NOTHING