ThreadPoolExecutor
# ThreadPoolExecutor
线程池
# 1. 实例化
# 1.1 最终构造器
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# 1.2 解读参数
corePoolSize
核心线程数,保持常活跃.- 当设置了
allowCoreThreadTimeOut
, 核心线程也会被释放
- 当设置了
maximumPoolSize
当前线程池允许的最大线程数keepAliveTime
线程数大于核心数(corePoolSize
)时启用,多余的空闲线程在终止前等待新任务的最长时间unit
时间单位,为keepAliveTime
服务workQueue
通过execute
投递的任务将会放在这个队列中,慢慢消费(通过采用有界队列)threadFactory
线程池创建新线程的工厂类handler
当且仅当maximumPoolSize
饱和和workQueue
达到最大值时启用,定义如何处理新投递的任务
# 1.3 核心属性解读
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3; //29
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 线程池的状态
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// 获取状态码
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 获取正在运行的线程数
private static int workerCountOf(int c) { return c & CAPACITY; }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
2
3
4
5
6
7
8
9
10
11
12
13
14
15
ctl
两种含义, 高三位表示线程池的状态, 低 29 位表示活跃的线程数量
# 2.任务投递关键方法
# 2.1 execute
往线程池中提交任务,也是整个线程池最关键的方法
# 2.1.1 源码
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 线程数是否小于核心数
if (workerCountOf(c) < corePoolSize) {
// 创建核心线程并运行
if (addWorker(command, true))
return; // 投递成功直接结束
c = ctl.get();
}
// 线程池运行中, 尝试添加到缓存队列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 二次判断线程池是否运行, 若没有则移除任务,并触发拒绝策略
if (! isRunning(recheck) && remove(command))
reject(command);
// 缓存队列添加成功了, 判断工作线程是不是空的, 是就创建一个非核心线程
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 队列添加失败(一般是队列满了), 尝试创建新的非核心线程来运行
else if (!addWorker(command, false))
// 执行拒绝策略, 调用 rejectedExecution 方法
reject(command);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# 2.1.2 流程图
由流程看出: 任务优先度是 coreThread -> workQueue -> maximumPoolSize ->
RejectedExecutionHandler(过程中出现异常或者线程池非运行状态都会运行)
# 2.2 addWorker
两个参数, Runnable firstTask, boolean core
创建新的线程的入口方法
- firstTask 创建新线程是投的的任务,可为null
- core 是否是核心线程
- 判断线程池是否将要关闭了,是直接返回 false
- 判断工作线程数是否大于等于核心线程数或者总的线程数, 是直接false
- 工作线程数数加1
- 调用 threadFactory 创建线程,并将其放到 works 线程池中
- 放入成功则直接启动线程, 否则线程数减1
# 3. 拒绝策略
默认提供了4种
AbortPolicy
直接抛出异常, 默认的就是中,通过6个参数的构造函数可看到CallerRunsPolicy
让投递这个任务的线程自己去执行这个任务DiscardPolicy
直接丢弃刚刚投递进来的任务, 什么也不做DiscardOldestPolicy
从队列中放弃最早投进去的任务,将新任务加进去
# 4. 几种常见的队列
注意使用无边界的队列,容易内存溢出
- PriorityBlockingQueue优先级阻塞队列, 无边界. 队列会根据优先级排序
- LinkedBlockingQueue 链表,通过构造器限定队列大小, 默认 Integer.MAX
- SynchronousQueue 可认为队列大小为1, 放入必须有人take
- ArrayBlockingQueue 基于数组的队列, 内部维护了一个
putIndex
,达到环形数组的效果
上次更新: 2023/06/12, 10:31:06