DXP BLOG
首页
  • JDk
  • Spring系列
  • 微服务
  • Json
  • Netty
  • Bug
  • Mysql
  • Postgresql
  • 达梦
  • activemq
  • rabbitmq
  • rocketmq
  • redis
  • Vue
  • React
  • Angular
  • Javascript
  • Typescript
linux
  • 协议
  • 加解密
  • 分类
  • 标签
  • 归档
Gitee (opens new window)

董新平

一个普普通通的代码程序猿
首页
  • JDk
  • Spring系列
  • 微服务
  • Json
  • Netty
  • Bug
  • Mysql
  • Postgresql
  • 达梦
  • activemq
  • rabbitmq
  • rocketmq
  • redis
  • Vue
  • React
  • Angular
  • Javascript
  • Typescript
linux
  • 协议
  • 加解密
  • 分类
  • 标签
  • 归档
Gitee (opens new window)
  • JDK

    • ThreadPoolExecutor
      • 1. 实例化
        • 1.1 最终构造器
        • 1.2 解读参数
        • 1.3 核心属性解读
      • 2.任务投递关键方法
        • 2.1 execute
        • 2.1.1 源码
        • 2.1.2 流程图
        • 2.2 addWorker
      • 3. 拒绝策略
      • 4. 几种常见的队列
    • Object类12中方法及作用
    • CountDownLatch
    • 读写锁
    • Atomic
  • Spring系列

  • Json

  • JAVA
  • JDK
dongxinping
2022-08-20
目录

ThreadPoolExecutor

# ThreadPoolExecutor

线程池

# 1. 实例化

# 1.1 最终构造器

 public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

# 1.2 解读参数

  • corePoolSize 核心线程数,保持常活跃.
    • 当设置了allowCoreThreadTimeOut, 核心线程也会被释放
  • maximumPoolSize 当前线程池允许的最大线程数
  • keepAliveTime 线程数大于核心数(corePoolSize)时启用,多余的空闲线程在终止前等待新任务的最长时间
  • unit 时间单位,为 keepAliveTime 服务
  • workQueue 通过 execute 投递的任务将会放在这个队列中,慢慢消费(通过采用有界队列)
  • threadFactory 线程池创建新线程的工厂类
  • handler 当且仅当 maximumPoolSize 饱和和 workQueue 达到最大值时启用,定义如何处理新投递的任务

# 1.3 核心属性解读

		private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;  //29
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // 线程池的状态
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

    // 获取状态码
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
   // 获取正在运行的线程数
    private static int workerCountOf(int c)  { return c & CAPACITY; }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
  • ctl 两种含义, 高三位表示线程池的状态, 低 29 位表示活跃的线程数量

# 2.任务投递关键方法

# 2.1 execute

往线程池中提交任务,也是整个线程池最关键的方法

# 2.1.1 源码

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        
        int c = ctl.get();
  // 线程数是否小于核心数
        if (workerCountOf(c) < corePoolSize) {
          // 创建核心线程并运行
            if (addWorker(command, true))
                return;  // 投递成功直接结束
            c = ctl.get();
        }
  // 线程池运行中, 尝试添加到缓存队列
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
          // 二次判断线程池是否运行, 若没有则移除任务,并触发拒绝策略
            if (! isRunning(recheck) && remove(command))
                reject(command);
          // 缓存队列添加成功了, 判断工作线程是不是空的, 是就创建一个非核心线程
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
  // 队列添加失败(一般是队列满了), 尝试创建新的非核心线程来运行
        else if (!addWorker(command, false))
          // 执行拒绝策略, 调用 rejectedExecution 方法
            reject(command);
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

# 2.1.2 流程图

由流程看出: 任务优先度是 coreThread -> workQueue -> maximumPoolSize ->

RejectedExecutionHandler(过程中出现异常或者线程池非运行状态都会运行)

# 2.2 addWorker

两个参数, Runnable firstTask, boolean core

创建新的线程的入口方法

  • firstTask 创建新线程是投的的任务,可为null
  • core 是否是核心线程
  1. 判断线程池是否将要关闭了,是直接返回 false
  2. 判断工作线程数是否大于等于核心线程数或者总的线程数, 是直接false
  3. 工作线程数数加1
  4. 调用 threadFactory 创建线程,并将其放到 works 线程池中
  5. 放入成功则直接启动线程, 否则线程数减1

# 3. 拒绝策略

默认提供了4种

  • AbortPolicy 直接抛出异常, 默认的就是中,通过6个参数的构造函数可看到
  • CallerRunsPolicy 让投递这个任务的线程自己去执行这个任务
  • DiscardPolicy 直接丢弃刚刚投递进来的任务, 什么也不做
  • DiscardOldestPolicy 从队列中放弃最早投进去的任务,将新任务加进去

# 4. 几种常见的队列

注意使用无边界的队列,容易内存溢出

  • PriorityBlockingQueue优先级阻塞队列, 无边界. 队列会根据优先级排序
  • LinkedBlockingQueue 链表,通过构造器限定队列大小, 默认 Integer.MAX
  • SynchronousQueue 可认为队列大小为1, 放入必须有人take
  • ArrayBlockingQueue 基于数组的队列, 内部维护了一个putIndex,达到环形数组的效果
#源码阅读
上次更新: 2023/06/12, 10:31:06
Object类12中方法及作用

Object类12中方法及作用→

最近更新
01
Redis数据类型
01-20
02
Atomic
12-27
03
编译安装Redis
12-27
更多文章>
dongxinping | Copyright © 2022-2024 Dongxinping | MIT License
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式