文章

Java线程池基础

Java线程池基础

线程池

什么是线程池?为什么要使用线程池?线程池好处?

什么是线程池?

线程作为操作系统宝贵的资源,对它的使用需要进行控制管理,线程池就是采用池化思想(类似连接池、常量池、对象池等)管理线程的工具。JUC 给我们提供了 ThreadPoolExecutor 体系类来帮助我们更方便的管理线程、并行执行任务。

为什么要使用线程池?线程池的好处?

池化技术
池化技术的思想主要是为了减少每次获取资源的消耗,提高对资源的利用率。如线程池、数据库连接池和 http 连接池等等。
使用线程池的好处 (两个降低、两个提高)

  1. 降低资源消耗:降低频繁创建、销毁线程带来的额外开销,复用已创建线程
  2. **降低使用复杂度: **将任务的提交和执行进行解耦,我们只需要创建一个线程池,然后往里面提交任务就行,具体执行流程由线程池自己管理,降低使用复杂度
  3. 提高响应速度:任务到达后,直接复用已创建好的线程执行,可以不需要等待线程创建就能立即执行任务
  4. 提高线程的可管理性:能安全有效的管理线程资源,避免不加限制无限申请造成资源耗尽风险。

线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配、监控和调优。还可以提供定时、定期、单线程、并发数控制等功能。

线程池解决了什么问题?

线程池解决的核心问题就是资源管理问题。在并发环境下,系统不能够确定在任意时刻中,有多少任务需要执行,有多少资源需要投入。这种不确定性将带来以下若干问题:

  1. 频繁申请/销毁资源和调度资源,将带来额外的消耗,可能会非常巨大。
  2. 对资源无限申请缺少抑制手段,易引发系统资源耗尽(OOM)的风险。
  3. 系统无法合理管理内部的资源分布,会降低系统的稳定性。

为解决资源分配这个问题,线程池采用了池化(Pooling)思想。池化,顾名思义,是为了最大化收益并最小化风险,而将资源统一在一起管理的一种思想。

线程池 API

线程池继承体系

hg9gx
线程池设计

  • 将任务提交和任务执行进行解耦
  • 线程池在内部实际上构建了一个生产者消费者模型,将线程和任务两者解耦,并不直接关联,从而良好的缓冲任务,复用线程

线程池的运行主要分成两部分:任务管理、线程管理。任务管理部分充当生产者的角色,当任务提交后,线程池会判断该任务后续的流转:(1)直接申请线程执行该任务;(2)缓冲到队列中等待线程执行;(3)拒绝该任务。线程管理部分是消费者,它们被统一维护在线程池内,根据任务请求进行线程的分配,当线程执行完任务后则会继续获取新的任务去执行,最终当线程获取不到任务的时候,线程就会被回收。

b3owy

线程池生命周期管理

ctl 变量(高 3 位存 runState,低 29 位存 workCount)
线程池运行的状态,由线程池内部维护,由一个 ctl 变量维护:线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount),高 3 位保存 runState,低 29 位保存 workerCount,两个变量之间互不干扰。

1
2
3
4
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int CAPACITY   = (1 << COUNT_BITS) - 1; // 00011111111111111111111111111111
private static int runStateOf(int c)     { return c & ~CAPACITY; } // 高3位为线程池的状态
private static int workerCountOf(int c)  { return c & CAPACITY; } // 低29位为线程池中的线程数量

ThreadPoolExecutor 的运行状态有 5 种,分别为:
l9btq

Eexcetors 提供的 4 种静态创建线程池(不推荐用这个类的静态方法来创建线程池)

newFixedThreadPool(int nThreads) 定长线程池 – 无界队列,容易 OOM (nThreads 代表的是最大和核心线程,最多只有 nThreads 个线程,任务数超过 nThreads,会丢到队列挂起)

1
2
3
4
5
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
          0L, TimeUnit.MILLISECONDS,
          new LinkedBlockingQueue<Runnable>());
}

线程数量固定且都是核心线程:核心线程数量和最大线程数量都是 nThreads;都是核心线程且不会被回收,快速响应外界请求 没有超时机制,无界(Integer.MAX_VALUE)的 LinkedBlockingQueue 队列,可能会堆积大量请求,导致 OOM 新任务使用核心线程处理,如果没有空闲的核心线程,则排队等待执行

newCachedThreadPool 可缓存线程池

1
2
3
4
5
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
          60L, TimeUnit.SECONDS,
          new SynchronousQueue<Runnable>());
}
  1. 创建的线程池最大线程数是用的 Integer.MAX_VALUE,可能会创建大量线程,导致 OOM
  2. thread idle 超过 60 秒会被杀死
  3. 采用了 SynchronousQueue 队列,没有存储空间,意味着只要有请求到来,必须从线程池中找一个线程来处理,否则将新建线程(可以处理非常大请求的任务,1000 个任务过来,那么线程池需要分配 1000 个线程来执行),OkHttp 就是用的这个
  4. 适合执⾏⼤量的耗时较少的任务,当所有线程闲置超过 60s 都会被停⽌,所以这时⼏乎不占⽤系统资源

newSingleThreadExecutor 单线程池 – 无界队列,容易 OOM

1
2
3
4
5
6
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>()));
}
  1. 只有⼀个核⼼线程,所有任务在同⼀个线程按顺序执⾏;corePoolSize 和 maximumPoolSize 都为 1
  2. 只有一个线程,不需要处理线程同步问题
  3. 无界(Integer.MAX_VALUE)的 LinkedBlockingQueue 队列,可能会堆积大量请求,导致 OOM

newScheduledThreadPool 定长、定时和周期性线程池

1
2
3
4
5
public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE,
          DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
          new DelayedWorkQueue());
}
  1. corePoolSize 支持配置;创建的线程池最大线程数是用的 Integer.MAX_VALUE,可能会创建大量线程,导致 OOM
  2. thread idle time 为 10 秒
  3. 支持延时和周期性

Executors 提供的这几种创建线程池的不足?

  • Executors.newFixedThreadPool 和 Executors.SingleThreadPool 创建的线程池内部使用的是无界(Integer.MAX_VALUE)的 LinkedBlockingQueue 队列,可能会堆积大量请求,导致 OOM
  • Executors.newCachedThreadPool 和 Executors.scheduledThreadPool 创建的线程池最大线程数是用的 Integer.MAX_VALUE,可能会创建大量线程,导致 OOM

ThreadPoolExecutor 的重要参数?有哪些拒绝策略?

1
2
3
4
5
6
7
8
public ThreadPoolExecutor(
    int corePoolSize,
    int maximumPoolSize,
    long keepAliveTime,
    TimeUnit unit,
    BlockingQueue<Runnable> workQueue,
    ThreadFactory threadFactory,
    RejectedExecutionHandler handler)

构造方法的 7 个参数 (ThreadPoolExecutor)

  1. corePoolSize 核心线程数(不管线程是否 idle)

核心线程数,线程一直存活在线程池中,及时线程处于空闲状态;

  • 默认情况下线程池是空的,只是任务提交时才会创建线程;如果当前运行的线程数少于 corePoolSize,则会创建新线程来处理任务;
  • 如果大于或等于 corePoolSize 则不再创建;
  • 调用 preStartAllCoreThread() 线程池会提前创建新启动所有的核心线程来执行等待任务;
  • 设置了 allowCoreThreadTimeOut 的话,在 keepAliveTime 后线程也会消亡
  1. maximumPoolSize 线程池中允许存在的最大线程数

线程池允许创建的最大线程数。如果任务队列满了且线程数小于 maximumPoolSize,则线程池仍然会创建新的线程来处理任务

  1. keepAliveTime 线程空闲时的存活时间

非核心线程闲置超时的时间,超过这个时间则回收;如果设置了 allowCoreThreadTimeOut 为 true,keepAliveTime 也会应用到核心线程上

  1. unit 时间参数单位
  2. workQueue 工作队列

当核心线程处于繁忙时,则将任务添加到此工作队列中;如果工作队列也超过了容量,会去尝试创建一个非核心线程执行任务;是阻塞队列 BlockingQueue

  1. threadFactory 线程工厂类

可给每个创建出来的线程设置名字

  1. handler RejectedExecutionHandler 工作队列饱和策略

当前任务队列 workQueue 和线程池 corePoolSize 都满了时所采取的应对策略

s1x65

线程池被创建后里面有线程吗?如果没有的话,你知道有什么方法对线程池进行预热吗?

线程池被创建后如果没有任务过来,里面是不会有线程的。
预热启动全部:prestartAllCoreThreads()

1
2
3
4
5
6
public int prestartAllCoreThreads() {
    int n = 0;
    while (addWorker(null, true))
        ++n;
    return n;
}

预热启动一个:prestartCoreThread()

1
2
3
4
public boolean prestartCoreThread() {
    return workerCountOf(ctl.get()) < corePoolSize &&
    	addWorker(null, true);
}

核心线程数会被回收吗?需要什么设置?

核心线程数默认是不会被回收的,如果需要回收核心线程数,需要调用下面的方法:
allowCoreThreadTimeOut() 该值默认为 false。

如何保证核心线程不死的?

  • 死循环空转?no
  • 阻塞队列,空元素时阻塞

线程池的线程,都是封装在 Worker 中,Worker 是一个实现了 AQS 的 Runnable,Worker 的 runWorker() 方法就是线程运行的逻辑,在里面有个 while 循环,不停的获取 task

1
2
3
4
5
6
7
8
final void runWorker(Worker w) {
    Runnable task = w.firstTask;
	// ... 
    while (task != null || (task = getTask()) != null) {
        // ..
    }
	// ...
}

现在看看 getTask(),返回的是一个 Runnable:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
 private Runnable getTask() {
     boolean timedOut = false; // Did the last poll() time out?
     for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // Are workers subject to culling?
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }
        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
 }

只要一直有 task,那么该线程就会一直工作下去,直到 task 返回 null,那么该线程就会消亡了,而 getTask 要返回 null,有几种种情况:

  1. 工作线程超过了 maximumPollSize
  2. 线程池 stop 了
  3. 线程池 shutdown 了且 workQueue 空了
  4. worker 等待一个 task 超时了 (keepAliveTime)

当 workQueue 为空后,

  • 如何设置了 allowCoreThreadTimeOut 属性或者 workCount 超过了 corePoolSize,那么取队头,最多等待 keepAliveTime 时间后抛出 InterruptedException
  • 未设置 allowCoreThreadTimeOut 属性且 workCount<=corePoolSize,那就会一直阻塞在 workQueue.take() 方法,直到 workQueue 有 task 进来,核心线程就会一直阻塞着

有哪些 workQueue 工作队列?

为什么需要阻塞队列?
阻塞队列用于实现 生产者-消费者 模型,任务的添加是生产者,任务的调度是消费者;如果是非阻塞队列,需要额外的同步策略和线程间唤醒策略:任务队列为空时,消费者线程取元素时会被阻塞,当有新的任务添加到任务队列中时需要唤醒消费者线程处理任务。

olnnl
ArrayBlockingQueue

由数组实现的有界阻塞队列,该队列按照 FIFO 对元素进行排序。维护两个整型变量,标识队列头尾在数组中的位置,在生产者和消费者获取元素共用一个锁对象,意味着两者无法真正的并行进行,性能较低

LinkedBlockingQueue

由链表组成的有界阻塞队列,如果不指定大小,默认使用 Integer.MAX_VALUE 作为队列大小,该队列按照 FIFO 对元素进行排序;对生产者和消费者分别维护了独立的锁来控制数据同步,意味着该队列有着更高的并发性能

  • LinkedBlockingQueue 的吞吐量比 ArrayBlockingQueue 的吞吐量要高。前者是基于链表实现的,后者是基于数组实现的,正常情况下,不应该是数组的性能要高于链表吗?

看了一下这两个阻塞队列的源码才发现,这是因为 LinkedBlockingQueue 的读和写操作使用了两个锁,takeLock 和 putLock,读写操作不会造成资源的争抢。而 ArrayBlockingQueue 的读和写使用的是同一把锁,读写操作存在锁的竞争。因此 LinkedBlockingQueue 的吞吐量高于 ArrayBlockingQueue

  • 可以基于 LinkedBlockingQueue 实现了内存安全阻塞队列 MemorySafeLinkedBlockingQueue,当系统内存达到设置的剩余阈值时,就不在往队列里添加任务了,避免发生 OOM

SynchronousQueue

不存储元素的阻塞队列,无容量,可以设置公平或非公平模式,插入操作必须等待获取操作移除元素,反之亦然。线程等待队列。同步队列,按序排队,先来先服务,并且不保留任务。为了避免当线程数达到 maximumPoolSize 造成的错误,所以 maximumPoolSize 通常设置无限大 Integer.MAX_VALUE。

特点:

  • 没有容量,不会存储元素
  • 支持公平锁和非公平锁
  • 线程每次 put 操作需要有对应的线程来 take,否则阻塞;反之亦然
  • 支持高并发
  • 一般设置 maxPoolSize 为最大值,避免超过 max 了执行了拒绝策略

原理:

  • JDK6 的 SynchronousQueue 采用了一种性能更好的无锁算法——扩展的 “Dual stack and Dual queue” 算法,性能比 Java5 有很大提升。竞争机制支持公平和非公平:非公平模式使用的是后进先出栈 (LIFO Stack),公平模式使用的是先进先出队列 (FIFO Queue),性能上两者是相当的;一般情况下,FIFO 可以支持更大的吞吐量,但 FIFO 可以更大程度的保存线程的本地化
  • 入列和出列都基于 Spin 和 CAS 方法
  • 核心接口 Transfer,生产者的 put 和消费者的 take 都使用这个接口,根据第一个参数来区分是入栈 (队列) 还是 (队列) 出栈:第一个参数不为 null 为生产者,为 null 为消费者

谁在用:

  • Executors.newCachedThreadPool() 用的是这个队列,可以处理非常大请求的任务,1000 个任务过来,那么线程池需要分配 1000 个线程来执行。
  • OkHttp 的线程池也是用的这个,每次请求来都分配一个线程来处理请求,这样可以及时处理掉请求避免请求阻塞了

PriorityBlockingQueue

支持优先级排序的无界阻塞队列,默认情况下根据自然序排序,也可以指定 Comparator

DelayQueue

支持延时获取元素的无界阻塞队列,创建元素时可以指定多久之后才能从队列中获取元素,常用于缓存系统或定时任务调度系统

LinkedTransferQueue

一个由链表结构组成的无界阻塞队列,与 LinkedBlockingQueue 相比多了 transfer 和 tryTranfer 方法,该方法在有消费者等待接收元素时会立即将元素传递给消费者。

LinkedBlockingDeque

一个由链表结构组成的双端阻塞队列,可以从队列的两端插入和删除元素。

怎么理解无界队列和有界队列?

有界队列会执行正常的线程池执行任务的流程;而无界队列除非系统资源耗尽,否则无界队列不存在任务入队失败的情况,如果后续任务很多,无界队列会耗尽系统内存

线程池的工作机制

任务调度是线程池的主要入口,所有任务的调度都是由 execute 方法完成的,是线程池的核心运行机制。
这部分完成的工作是:检查现在线程池的运行状态、运行线程数、运行策略,决定接下来执行的流程,是直接申请线程执行,或是缓冲到队列中执行,亦或是直接拒绝该任务。其执行过程如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}
  1. 判断线程池的状态,如果不是 RUNNING 状态,直接执行拒绝策略
  2. 如果线程池中 workCount< corePoolSize,则新建一个核心线程来执行任务(需全局锁)
  3. 如果 workCount>=corePoolSize,且 workQueue 没满,则加入到 workQueue 任务队列(workQueue 需要有界的阻塞队列,避免可能出现的 OOM)中排队等待执行
  4. 如果 corePoolSize<workCount<=maximumPoolSize,且 workQueue 任务队列已满,则新建一个非核心线程来执行任务
  5. 如果 workCount>maximumPoolSize,且 workQueue 已满,则执行 handler 拒绝策略,默认是 AbortPolicy 抛出异常

workCount 计算取后 29 位,int workerCountOf(int c) { return c & CAPACITY; }

0tty0

线程池面试题

面试官:线程池灵魂8连问,你挡的住吗?

阿里巴巴 Java 开发手册为啥禁止使用 Executors 框架创建线程池?

阿里巴巴 Java 开发规范,该规范里明确说明不允许使用 Executors 创建线程池,而是通过 ThreadPoolExecutor 显示指定参数去创建线程池。
Executors 创建的线程池有发生 OOM 的风险。

  • Executors.newFixedThreadPool 和 Executors.SingleThreadPool 创建的线程池内部使用的是无界(Integer.MAX_VALUE)的 LinkedBlockingQueue 队列,可能会堆积大量请求,导致 OOM
  • Executors.newCachedThreadPool 和 Executors.scheduledThreadPool 创建的线程池最大线程数是用的 Integer.MAX_VALUE,可能会创建大量线程,导致 OOM

线程池面试题的回答模板

  1. 讲讲线程池是什么?线程池解决的问题,带来了什么好处?

线程池就是 JUC 提供给开发者管理线程的工具。 线程池解决的核心问题就是资源管理问题,避免资源的浪费。 使用线程池的好处 (两个降低、两个提高)

  1. 降低资源消耗:降低频繁创建、销毁线程带来的额外开销,复用已创建线程
  2. **降低使用复杂度: **将任务的提交和执行进行解耦,我们只需要创建一个线程池,然后往里面提交任务就行,具体执行流程由线程池自己管理,降低使用复杂度
  3. 提高响应速度:任务到达后,直接复用已创建好的线程执行,可以不需要等待线程创建就能立即执行任务
  4. 提高线程的可管理性:能安全有效的管理线程资源,避免不加限制无限申请造成资源耗尽风险。
  1. 顺带讲讲池化技术:数据库连接池、对象池

AndroidX 提供了 Pools 池化工具类帮助开发者更方便使用对象池

  1. JUC 中线程池的 API 设计

任务和线程分离

  1. ThreadPoolExecutor 核心参数:核心线程数(corePoolSize)、最大线程数(maximumPoolSize),空闲线程超时时间(keepAliveTime)、时间单位(unit)、阻塞队列(workQueue)、拒绝策略(handler)、线程工厂(ThreadFactory)这 7 个参数代表的含义
  2. 描述下 JUC 线程池的执行流程,即 execute() 执行流程:
  1. workCount<corePoolSize,新建一个线程来处理提交的任务
  2. workCount>=corePoolSize,workQueue 未满,入队
  3. workCount>=corePoolSize,workQueue 满了,workCount<=maxmiumPoolSize,新建线程执行
  4. workCount>maxmiumPoolSize,执行 rejectHandler,默认 AbortPolicy
  1. 在回答完包含哪些参数及 execute 方法的执行流程后。然后可以说下这个执行流程是 JUC 标准线程池提供的执行流程,主要用在 CPU 密集型场景下。像 Tomcat、Dubbo 这类框架,他们内部的线程池主要用来处理网络 IO 任务的,所以他们都对 JUC 线程池的执行流程进行了调整来支持 IO 密集型场景使用。改写后 Tomcat 线程池执行流程如下:
  1. workCount<corePoolSize,新建一个线程来处理提交的任务
  2. corePoolSize<=workCount<=maxmiumPoolSize,则创建新的线程执行提交的任务
  3. workCount>maxmiumPoolSize:
    1. workQueue 未满,则将任务放入任务队列 workQueue 等待执行
    2. workQueue 已满,执行拒绝策略
  1. 再说说 Worker 线程模型,继承了 AQS 实现了锁机制;线程启动后执行 runWorker() 方法,runWorker() 方法调用 getTask() 方法从阻塞队列中获取任务,获取到任务后先执行 beforeExecute() 钩子函数,再执行任务,然后再执行 afterExecute() 钩子函数;若超过获取不到任务会调用 processWorkerExit() 方法执行 Worker 线程的清理工作
  2. 线程池中的锁:mainLock,Worker 这个 AQS

如何正确关闭线程池?shutdown 和 shutdownNow 的区别?

shutdown()

当线程池调用该方法时,线程池的状态则立刻变成 SHUTDOWN 状态。此时,则不能再往线程池中添加任何任务,否则将会抛出 RejectedExecutionException 异常。但是,此时**线程池不会立刻退出,直到添加到线程池中的任务都已经处理完成,才会退出。 **

shutdownNow()

执行该方法,线程池的状态立刻变成 STOP 状态,并试图停止所有正在执行的线程,不再处理还在池队列中等待的任务,当然,它会返回那些未执行的任务,返回一个。 它试图终止线程的方法是通过调用 Thread.interrupt() 方法来实现的,但是大家知道,这种方法的作用有限,如果线程中没有 sleep 、wait、Condition、定时锁等应用, interrupt() 方法是无法中断当前的线程的。所以,shutdownNow() 并不代表线程池就一定立即就能退出,它可能必须要等待所有正在执行的任务都执行完成了才能退出。

你在使用线程池的过程中遇到过哪些坑或者需要注意的地方?

OOM 问题

刚开始使用线程都是通过 Executors 创建的,这种方式创建的线程池会有发生 OOM 的风险。

  • Executors.newFixedThreadPool 和 Executors.SingleThreadPool 创建的线程池内部使用的是无界(Integer.MAX_VALUE)的 LinkedBlockingQueue 队列,可能会堆积大量请求,导致 OOM
  • Executors.newCachedThreadPool 和 Executors.scheduledThreadPool 创建的线程池最大线程数是用的 Integer.MAX_VALUE,可能会创建大量线程,导致 OOM

解决:手动自己设置线程池参数

任务执行异常了,但异常丢失了的问题

submit 提交任务,FutureTask 的 run 方法会 try catch,但未处理,导致异常会被吞没;execute 异常会被抛出
解决:

  1. 在任务代码中增加 try、catch 异常处理
  2. 如果使用的 Future 方式,则可通过 Future 对象的 get 方法接收抛出的异常
  3. 为工作线程设置 setUncaughtExceptionHandler,在 uncaughtException 方法中处理异常
  4. 可以重写 afterExecute(Runnable r, Throwable t) 方法,拿到异常 t

共享线程池问题

整个服务共享一个全局线程池,导致任务相互影响,耗时长的任务占满资源,短耗时任务得不到执行。同时父子线程间会导致死锁的发生,进而导致 OOM

ThreadLocal 在线程池场景下会失效,可以考虑用阿里开源的 Ttl 来解决

okhttp 是一个无界的队列,会不会出现积累大量的请求导致 oom

okhttp 线程池配置:

1
2
3
4
5
6
7
8
9
@get:Synchronized
@get:JvmName("executorService") val executorService: ExecutorService
get() {
  if (executorServiceOrNull == null) {
    executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,
        SynchronousQueue(), threadFactory("$okHttpName Dispatcher", false))
  }
  return executorServiceOrNull!!
}

异步请求最大数的配置:

1
2
3
4
class Dispatcher constructor() {
	var maxRequests = 64
	var maxRequestsPerHost = 5
}
  1. 每个 host 最多请求 5 个
  2. 所有请求最多 64 个

线程池的非核心线程工作完成后会复用吗?

会。当一个线程启动了,会执行 runWorker,从 getTask 拿任务,getTask 是个死循环,不停的从 workQueue 拿任务执行,所以说只要 workQueue 有任务,不管是不是 core 线程,都会复用从 workQueue 拿任务执行。
等 workQueue 空了,非 core 的线程会 poll(keepAliveTime),即阻塞 keepAliveTime 如果还没有任务就死了;而 core 线程会一直 take 直到有新的 task 进来。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
final void runWorker(Worker w) {
    while (task != null || (task = getTask()) != null) {
        task.run();
    }
}
private Runnable getTask() {
    for (;;) {
        int c = ctl.get();
        int wc = workerCountOf(c);
        // Are workers subject to culling?
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }
        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}
本文由作者按照 CC BY 4.0 进行授权