文章

Java线程池进阶

Java线程池进阶

Java 线程池

线程池复用原理

new 的一个 thread,执行完毕后,线程就消亡了,也就没法复用了
线程池,任务和线程是分开的,提交一个任务时,如果当前线程少于 corePoolSize 的话,会创建一个 Worker(其实就是对 Thread 的一个包装),也就是创建一个线程;超过 corePoolSize 的任务会放到 workQueue;这个 Worker 就在那不停的运转者,从 workQueue 不停的取任务,就不用创建新的线程了。

1
2
3
4
5
6
7
8
9
10
final void runWorker(Worker w) {
    Runnable task = w.firstTask;
    while (task != null || (task = getTask()) != null) {
        try {
            task.run();
        } finally {
            task = null;
        }
    }
}

实现线程复用的逻辑:通过 while 循环不停的从 task 阻塞队列中取任务,即通过阻塞当前线程来达到复用的目的

  1. 通过取 Worker 的 firstTask 或者通过 getTask 方法从 workQueue 中获取待执行的任务。
  2. 直接调用 task 的 run 方法来执行具体的任务(而不是新建线程)。

通过取 Worker 的 firstTask 或者 getTask 方法从 workQueue 中取出了新任务,并直接调用 Runnable 的 run 方法来执行任务,每个线程都始终在一个大循环中,反复获取任务,然后执行任务,从而实现了线程的复用。

线程池用了哪些锁?为什么要用锁?

mainLock 锁

ThreadPoolExecutor 内部维护了 ReentrantLock 类型的 mainLock,在访问 workers 成员变量以及进行数据统计记账(如访问 largestPoolSize、completedTaskCount)时需要获取该重入锁。

为什么需要 mainLock 锁?

1
2
3
4
private final ReentrantLock mainLock = new ReentrantLock();
private final HashSet<Worker> workers = new HashSet<Worker>(); // 当前线程引用
private int largestPoolSize; // 用来记录线程池中,曾经出现过的最大线程数
private long completedTaskCount;

可以看到 workers 变量用的 HashSet 是线程不安全的,是不能用于多线程环境的。largestPoolSize(用来记录线程池中,曾经出现过的最大线程数)、completedTaskCount 也是没用 volatile 修饰,所以需要在锁的保护下进行访问。

为什么不直接用个线程安全容器呢而用 ReentrantLock 呢?

其实 Doug Lea 老爷子在 mainLock 变量的注释上解释了,意思就是说事实证明,相比于线程安全容器,此处更适合用 lock,主要原因之一就是串行化 interruptIdleWorkers() 方法,避免了不必要的中断风暴 (interrupt storms),特别是在 shutdown 时。

  1. 第一个原因是这样说的:

Among the reasons is that this serializes interruptIdleWorkers, which avoids unnecessary interrupt storms, especially during shutdown. Otherwise exiting threads would concurrently interrupt those that have not yet interrupted. 串行化 interruptIdleWorkers() 方法,可避免不必要的中断风暴,特别是在 shutdown 时

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers) {
            Thread t = w.thread;
            if (!t.isInterrupted() && w.tryLock()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
            if (onlyOne)
                break;
        }
    } finally {
        mainLock.unlock();
    }
}

interruptIdleWorkers:首先拿 mainLock 锁,然后尝试去做中断线程的操作

由于有 mainLock.lock 的存在,所以多个线程调用这个方法,就被 serializes 串行化了起来。
shutdown 方法调用了 interruptIdleWorkers() 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        advanceRunState(SHUTDOWN);
        interruptIdleWorkers();
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
}

假设我们使用的是并发安全的 Set 集合,而不用 mainLock,这个时候有 5 个线程并发调用 shutdown() 方法,由于没有用 mainLock,所以没有阻塞,那么每一个线程都会运行 interruptIdleWorkers() 方法,就会出现第一个线程发起了中断,导致 worker 中断,第 2 个线程又来发起中断了,一个线程去中断正在中断中的 Worker 线程,所谓的中断风暴 (intertupt storms)。
加 mainLock 就是为了避免出现中断风暴的风险,并发时只想要有一个线程能发起中断的操作。
有了 mainLock 后,反正 set 集合也会被锁起来,那索性就不需要并发安全的 set 集合了。
所以:用 mainLock 来实现串行化,同时保证了 set 集合的并发安全

  1. 第二个原因:

It also simplifies some of the associated statistics bookkeeping of largestPoolSize etc.

加锁好维护 largestPoolSize 这个参数,completedTaskCount 参数也是一样

largestPoolSize、completedTaskCount 变量加个 volatile 关键字修饰是不是就可以不用 mainLock 了?

如果用 volatile,不加锁,就少了一个 mainLock.lock() 的操作,就有可能少了一个阻塞等待的操作,假设 addWorkers 方法还没来得及修改 largestPoolSize 的值,就有线程调用了 getLargestPoolSize() 方法,由于没阻塞,直接获取到的值只是那一瞬间的值,不一定是 addWorker() 方法执行完成后的值,加上 mainLock,程序是能感知到 largestPoolSize 有可能正在发生变化,获取到的一定是 addWorker() 方法执行完成后的值,加锁,是为了最大程序上保证这个参数的准确性,在获取这两个值时,能保证获取到的一定是修改方法执行完成后的值。如果不加锁,可能在修改方法还没执行完成时,此时来获取该值,获取到的就是修改前的值

Worker 对象 (就是个 AQS)

Worker 类存在的意义?继承 AQS 的意义?

worker 类存在的主要意义就是为了维护线程的中断状态。维护的是正在运行的线程
Class Worker mainly maintains interrupt control state for threads running tasks, along with other minor bookkeeping.
Worker 继承 AQS 主要就是为了实现了一把非重入锁,维护线程的中断状态,保证不能中断运行中的线程。

主要是在 interruptIdleWorkers 方法中,有个 tryLock() 判断,该实现是个不可重入锁,就可以避免了正在运行的线程被中断了

中断的是什么类型的线程呢?

正在等待任务的线程,ThreadPoolExecutor#getTask() 的 poll 和 take() 方法。

// interruptIdleWorkers Interrupts threads that might be waiting for tasks (as indicated by not being locked) so they can check for termination or configuration changes. Ignores SecurityExceptions (in which case some threads may remain uninterrupted).

这也说明:正在执行任务的线程是不应该被中断的
线程池怎么知道那哪任务是正在执行中的,不应该被中断呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers) {
            Thread t = w.thread;
            if (!t.isInterrupted() && w.tryLock()) { // 1
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
            if (onlyOne)
                break;
        }
    } finally {
        mainLock.unlock();
    }
}

第 7 行,判断逻辑有个 w.tryLock(),未被中断的且 w.tryLock() 的线程可以被中断。0 表示为持有锁,1 表示持有锁

1
public boolean tryLock()  { return tryAcquire(1); }

在 runWorker() 的时候会调用 w.lock() 上锁,获取到 task,准备执行的时候,也就是说lock state 为 1 的 worker 就是正在执行任务的线程,不可以被中断

为什么不用 ReentrantLock 而是选择了自己搞一个 worker 类

// Worker.java We implement a simple non-reentrant mutual exclusion lock rather than use ReentrantLock because we do not want worker tasks to be able to reacquire the lock when they invoke pool control methods like setCorePoolSize

想要一个不能重入的互斥锁,而 ReentrantLock 是可重入锁。

为什么需要不可重入锁,如果是可重入锁呢?

目的:不想 interrupt 正在执行任务的线程
上面提到的 setCorePoolSize() 方法:

1
2
3
4
5
6
7
8
public void setCorePoolSize(int corePoolSize) {
    // ...
    // 说明当前的 worker 的数量是多余我要重新设置的 corePoolSize,需要减少一点。通过interruptIdleWorkers()方法中断线程
    if (workerCountOf(ctl.get()) > corePoolSize)
    	interruptIdleWorkers();
	else if // ...
    // ...
}

线程池动态调整时,会调用 setCorePoolSize,当 workCount>corePoolSize 时,需要减少线程,调用 interruptIdleWorkers() 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers) {
            Thread t = w.thread;
            if (!t.isInterrupted() && w.tryLock()) { // 1
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
            if (onlyOne)
                break;
        }
    } finally {
        mainLock.unlock();
    }
}

里面的 w.tryLock(),由于 Worker 是一个独占锁、不可重入锁,同一时刻只能有一个线程且只能一次获取锁;同时在 runWorker 时已经调用了 lock 了,表示 worker 正在执行,在这 tryLock 会失败返回 false,下面的 t.interrupt() 是不会被调用的,即正在运行的线程是不能被 interrupt 的
如果用 ReentrantLock 的话,由于 ReentrantLock 是可重入的,那么这里 w.tryLock() 就会返回 true,可能把正在执行的 worker 给 interrupt 了。

lock state 为啥初始化为 -1

Additionally, to suppress interrupts until the thread actually starts running tasks, we initialize lock state to a negative value, and clear it upon start (in runWorker). 为了在线程真正开始运行任务之前,抑制 interrupt。所以把 worker 的状态初始化为负数 (-1),会在 runWorker() 方法执行前辈 clear 掉 (即进入可被 interrupt 状态)。

如果 Worker 还未启动,就调用了 interruptIdleWorkers(),w.tryLock() 会调用到 tryAcquire(),会通过 compareAndSetState(0, 1),由于 state 默认值为 -1,会返回 false,导致 tryLock() 返回 false,下面的 t.interrupt() 逻辑就走不到了,即未启动的线程不会被 interrupt

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers) {
            Thread t = w.thread;
            if (!t.isInterrupted() && w.tryLock()) { // 1 
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
            if (onlyOne)
                break;
        }
    } finally {
        mainLock.unlock();
    }
}

在 runWorker() 中,有一个 w.unLock() 操作是干嘛的?

在线程启动前调用 w.unlock(),将 state 恢复为 0,就可以支持 interrupt 了

1
2
3
4
5
6
7
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
	// ...
}

JUC 原生线程池 vs Tomcat、Dubbo 框架线程池

JUC 标准线程池提供的执行流程,主要用在 CPU 密集型场景下。
像 Tomcat、Dubbo 这类框架,他们内部的线程池主要用来处理网络 IO 任务的,所以他们都对 JUC 线程池的执行流程进行了调整来支持 IO 密集型场景使用。
他们提供了阻塞队列 TaskQueue,该队列继承 LinkedBlockingQueue,重写了 offer() 方法来实现执行流程的调整。

1
2
3
4
5
6
7
8
9
10
11
12
13
@Override
public boolean offer(Runnable o) {
    //we can't do any checks
    if (parent==null) return super.offer(o);
    //we are maxed out on threads, simply queue the object
    if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o);
    //we have idle threads, just add it to the queue
    if (parent.getSubmittedCount()<=(parent.getPoolSize())) return super.offer(o);
    //if we have less threads than maximum force creation of a new thread
    if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false;
    //if we reached here, we need to add it to the queue
    return super.offer(o);
}

可以看到他在入队之前做了几个判断,这里的 parent 就是所属的线程池对象:

1.如果 parent 为 null,直接调用父类 offer 方法入队 2.如果当前线程数等于最大线程数,则直接调用父类 offer() 方法入队 3.如果当前未执行的任务数量小于等于当前线程数,仔细思考下,是不是说明有空闲的线程呢,那么直接调用父类 offer() 入队后就马上有线程去执行它 4.如果当前线程数小于最大线程数量,则直接返回 false,然后回到 JUC 线程池的执行流程回想下,是不是就去添加新线程去执行任务了呢 5.其他情况都直接入队

可以看出当当前线程数大于核心线程数时,JUC 原生线程池首先是把任务放到队列里等待执行,而不是先创建线程执行。
如果 Tomcat 接收的请求数量大于核心线程数,请求就会被放到队列中,等待核心线程处理,这样会降低请求的总体响应速度。
所以 Tomcat 并没有使用 JUC 原生线程池,利用 TaskQueue 的 offer() 方法巧妙的修改了 JUC 线程池的执行流程,改写后 Tomcat 线程池执行流程如下:

  1. 判断如果当前线程数小于核心线程池,则新建一个线程来处理提交的任务
  2. 如果当前当前线程池数大于核心线程池,小于最大线程数,则创建新的线程执行提交的任务
  3. 如果当前线程数等于最大线程数,则将任务放入任务队列等待执行
  4. 如果队列已满,则执行拒绝策略

线程池最佳实践

1、使用 ThreadPoolExecutor 声明线程池,避免使用 Executors 提供的静态方法

线程池必须手动通过 ThreadPoolExecutor 的构造函数来声明,避免使用 Executors 类的静态方法声明线程池,因为可能会有 OOM 的风险。

  • FixedThreadPool 和 SingleThreadExecutor : 允许请求的队列长度为 Integer.MAX_VALUE,可能堆积大量的请求,从而导致 OOM。
  • CachedThreadPool 和 ScheduledThreadPool : 允许创建的线程数量为 Integer.MAX_VALUE ,可能会创建大量线程,从而导致 OOM。

使用有界队列,控制线程创建数量。

2、监测线程池运行状态

ThreadPoolExecutor 提供了获取线程池当前的线程数和活跃线程数、已经执行完成的任务数、正在排队中的任务数等等。

每隔一秒打印出线程池的线程数、活跃线程数、完成的任务数、以及队列中的任务数:

1
2
3
4
5
6
7
8
9
10
11
public static void printThreadPoolStatus(ThreadPoolExecutor threadPool) {
    ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1, createThreadFactory("print-images/thread-pool-status", false));
    scheduledExecutorService.scheduleAtFixedRate(() -> {
        log.info("=========================");
        log.info("ThreadPool Size: [{}]", threadPool.getPoolSize());
        log.info("Active Threads: {}", threadPool.getActiveCount());
        log.info("Number of Tasks : {}", threadPool.getCompletedTaskCount());
        log.info("Number of Tasks in Queue: {}", threadPool.getQueue().size());
        log.info("=========================");
    }, 0, 1, TimeUnit.SECONDS);
}

3、给线程池命名

初始化线程池的时候需要显示命名(设置线程池名称前缀),有利于定位问题。
默认情况下创建的线程名字类似 pool-1-thread-n 这样的,没有业务含义,不利于我们定位问题。

  1. 利用 guava 的 ThreadFactoryBuilder
1
2
3
4
ThreadFactory threadFactory = new ThreadFactoryBuilder()
    .setNameFormat(threadNamePrefix + "-%d")
    .setDaemon(true).build();
ExecutorService threadPool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory)
  1. 自己实现 ThreadFactory
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public final class NamingThreadFactory implements ThreadFactory {
    private final AtomicInteger threadNum = new AtomicInteger();
    private final ThreadFactory delegate;
    private final String name;
    public NamingThreadFactory(ThreadFactory delegate, String name) {
        this.delegate = delegate;
        this.name = name; // TODO consider uniquifying this
    }
    @Override 
    public Thread newThread(Runnable r) {
        Thread t = delegate.newThread(r);
        t.setName(name + " [#" + threadNum.incrementAndGet() + "]");
        return t;
    }
}

4、如何合理地配置线程池?

需要大致了解任务是 CPU 密集型还是 IO 密集型
CPU 速度快,IO 速度慢,
简单并且适用面比较广的公式:

  1. CPU 密集型任务 (N+1): corePoolSize = CPU 核数 + 1 (加 1 的目的是保留一个备份线程)

CPU 密集型有大量的计算任务,CPU 占有率高,可以将线程数设置为 N(CPU 核心数)+1,比 CPU 核心数多出来的一个线程是为了防止核心线程阻塞或意外中断的候补。一旦任务暂停,CPU 就会处于空闲状态,而在这种情况下多出来的一个线程就可以充分利用 CPU 的空闲时间。

  1. I/O 密集型任务 (2N): corePoolSize = CPU 核数 * 2

IO 密集型通常指文件 I/O、网络 IO 等。这种任务应用起来,系统会用大部分的时间来处理 I/O 交互,而线程在处理 I/O 的时间段内不会占用 CPU 来处理,这时就可以将 CPU 交出给其它线程使用。因此在 I/O 密集型任务的应用中,我们可以多配置一些线程,具体的计算方法是 2N。 线程数的选取与 IO 耗时和 CPU 耗时的比例有关,最佳线程数=CPU 核心数 *[1+IO 耗时/CPU 耗时],之所以设置比例是为了使 IO 设备和 CPU 的利用率达到最大

如何判断是 CPU 密集任务还是 IO 密集任务?
CPU 密集型简单理解就是利用 CPU 计算能力的任务比如你在内存中对大量数据进行排序。
但凡涉及到网络读取,文件读取这类都是 IO 密集型,这类任务的特点是 CPU 计算耗费时间相比于等待 IO 操作完成的时间来说很少,大部分时间都花在了等待 IO 操作完成上。
动态配置线程池?怎么确定线程池大小呢?
其实没有固定答案,需要通过压测不断的动态调整线程池参数,观察 CPU 利用率、系统负载、GC、内存、RT、吞吐量 等各种综合指标数据,来找到一个相对比较合理的值。
所以不要再问设置多少线程合适了,这个问题没有标准答案,需要结合业务场景,设置一系列数据指标,排除可能的干扰因素,注意链路依赖(比如连接池限制、三方接口限流),然后通过不断动态调整线程数,测试找到一个相对合适的值。

自定义线程池

https://github.com/Blankj/AndroidUtilCode/blob/master/utilcode/lib/src/main/java/com/blankj/utilcode/util/ThreadUtils.java
以华为主要的手机,会出现线程过多的 OOM(p_thread created)

Android 中的线程池

AsyncTask(API28)

AsyncTask 线程池相关参数:

  1. corePoolSize:核心线程数 [2,4],CPU 核数个数减去 1,至少留一个 CPU 避免阻塞其他后台工作
  2. maximumPoolSize:最大线程数,设备最大 CPU 核数 *2+1
  3. keepAliveTime:默认是 30s,线程可以 idle 30s
  4. workQueue:队列 LinkedBlockingQueue,最多可以存 128 个 Runnable,超过要阻塞
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();

// We want at least 2 threads and at most 4 threads in the core pool,

// preferring to have 1 less than the CPU count to avoid saturating

// the CPU with background work

private static final int CORE_POOL_SIZE = Math.max(2, Math.min(CPU_COUNT - 1, 4));

private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1;

private static final int KEEP_ALIVE_SECONDS = 30;

private static final BlockingQueue<Runnable> sPoolWorkQueue =

        new LinkedBlockingQueue<Runnable>(128);

/**
 * An {@link Executor} that can be used to execute tasks in parallel.
 */
public static final Executor THREAD_POOL_EXECUTOR;

static {
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
            CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
            sPoolWorkQueue, sThreadFactory);
    threadPoolExecutor.allowCoreThreadTimeOut(true);
    THREAD_POOL_EXECUTOR = threadPoolExecutor;
}

EventBus3

EventBus 默认的线程池是 Executors.newCachedThreadPool(),直接用的 Executors 的静态 CacheThreadPool,没有线程数大小限制

1
2
3
public class EventBusBuilder {
    private final static ExecutorService DEFAULT_EXECUTOR_SERVICE = Executors.newCachedThreadPool();
}

通过自定义线程池:

1
EventBus.builder().executorService(xxx);

Fresco

设置 Fresco 线程池

配置 Fresco 线程池,实现一个 ExecutorSupplier 即可。

1
2
3
4
5
6
7
8
public static ImagePipelineConfig getImagePipelineConfig(Context context) {
    ImagePipelineConfig imagePipelineConfig = OkHttpImagePipelineConfigFactory
            .newBuilder(context, getOkHttpClient())// 替换网络实现为okhttp3
            .setExecutorSupplier(ExecutorSupplier) // 配置实现自己的线程池
            .setPoolFactory()
            .build();
    return imagePipelineConfig;
}

Fresco 线程池默认实现

Fresco 默认的 ExecutorSupplier,DefaultExecutorSupplier

Fresco v1.8.1 默认的 DefaultExecutorSupplier 有 4 个线程池

  • io

用于 io 操作,定长 2 个线程数的 FixThreadPool 线程池

  • decode

用于编解码、转换,定长为最大是 CPU 核数的 FixThreadPool 线程池

  • background

后台任务,定长为最大是 CPU 核数的 FixThreadPool 线程池

  • lightweightbackground

轻量的后台任务,定长为 1 的 FixThreadPool 线程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// 默认
// Below this comment can't be built in alphabetical order, because of dependencies
int numCpuBoundThreads = mPoolFactory.getFlexByteArrayPoolMaxNumThreads();
mExecutorSupplier =
    builder.mExecutorSupplier == null ?
        new DefaultExecutorSupplier(numCpuBoundThreads) : builder.mExecutorSupplier;

默认在`PoolParams#maxNumThreads`默认线程池中大小
class DefaultFlexByteArrayPoolParams {
    // the maximum number of threads permitted to touch this pool
    public static final int DEFAULT_MAX_NUM_THREADS = Runtime.getRuntime().availableProcessors();
}

public class DefaultExecutorSupplier implements ExecutorSupplier {
  // Allows for simultaneous reads and writes.
  private static final int NUM_IO_BOUND_THREADS = 2;
  private static final int NUM_LIGHTWEIGHT_BACKGROUND_THREADS = 1;

  private final Executor mIoBoundExecutor;
  private final Executor mDecodeExecutor;
  private final Executor mBackgroundExecutor;
  private final Executor mLightWeightBackgroundExecutor;

  public DefaultExecutorSupplier(int numCpuBoundThreads) {
    mIoBoundExecutor =
        Executors.newFixedThreadPool(
            NUM_IO_BOUND_THREADS,
            new PriorityThreadFactory(
                Process.THREAD_PRIORITY_BACKGROUND, "FrescoIoBoundExecutor", true));
    mDecodeExecutor =
        Executors.newFixedThreadPool(
            numCpuBoundThreads,
            new PriorityThreadFactory(
                Process.THREAD_PRIORITY_BACKGROUND, "FrescoDecodeExecutor", true));
    mBackgroundExecutor =
        Executors.newFixedThreadPool(
            numCpuBoundThreads,
            new PriorityThreadFactory(
                Process.THREAD_PRIORITY_BACKGROUND, "FrescoBackgroundExecutor", true));
    mLightWeightBackgroundExecutor =
        Executors.newFixedThreadPool(
            NUM_LIGHTWEIGHT_BACKGROUND_THREADS,
            new PriorityThreadFactory(
                Process.THREAD_PRIORITY_BACKGROUND, "FrescoLightWeightBackgroundExecutor", true));
  }
  // ...
}

OkHttp3

配置 OkHttp3 的线程池

1
new OkHttpClient.Builder().dispatcher(Dispatcher dispatcher);

Dispatcher

无界,60s 线程 idle 时间,

1
2
3
4
5
6
7
8
9
10
11
public final class Dispatcher {
  private int maxRequests = 64;
  private int maxRequestsPerHost = 5;
  public synchronized ExecutorService executorService() {
    if (executorService == null) {
      executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
          new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
    }
    return executorService;
  }
}

Glide4

默认 Glide4 分为 sourceExecutor 和 diskCacheExecutor;diskCacheExecutor 默认为 1 个线程,sourceExecutor 默认为 CPU 的核数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public final class GlideBuilder {
    private GlideExecutor sourceExecutor;
    private GlideExecutor diskCacheExecutor;
}

public class GlideExecutor {
    /**
    * The default thread count for executors used to load/decode/transform data found in Glide's
    * cache.
    */
    public static final int DEFAULT_DISK_CACHE_EXECUTOR_THREADS = 1;
    
    public static int calculateBestThreadCount() {
        if (bestThreadCount == 0) {
          // We override the current ThreadPolicy to allow disk reads.
          // This shouldn't actually do disk-IO and accesses a device file.
          // See: https://github.com/bumptech/glide/issues/1170
          ThreadPolicy originalPolicy = StrictMode.allowThreadDiskReads();
          File[] cpus = null;
          try {
            File cpuInfo = new File(CPU_LOCATION);
            final Pattern cpuNamePattern = Pattern.compile(CPU_NAME_REGEX);
            cpus = cpuInfo.listFiles(new FilenameFilter() {
              @Override
              public boolean accept(File file, String s) {
                return cpuNamePattern.matcher(s).matches();
              }
            });
          } catch (Throwable t) {
            if (Log.isLoggable(TAG, Log.ERROR)) {
              Log.e(TAG, "Failed to calculate accurate cpu count", t);
            }
          } finally {
            StrictMode.setThreadPolicy(originalPolicy);
          }
    
          int cpuCount = cpus != null ? cpus.length : 0;
          int availableProcessors = Math.max(1, Runtime.getRuntime().availableProcessors());
          bestThreadCount =
              Math.min(MAXIMUM_AUTOMATIC_THREAD_COUNT, Math.max(availableProcessors, cpuCount));
        }
        return bestThreadCount;
      }
}

RxJava2

RxJava2 对应的线程池管理类是 Schedulers

io 是无界的线程池,1 个核心线程

Bolts

  1. 默认如果是 Java 平台,是个无界的线程池;
  2. 如果是 Android 平台,核心线程数是 CPU+1 个数,最大个数是 CPU_COUNT * 2 + 1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
final class BoltsExecutors {
    private final ExecutorService background;
    private final ScheduledExecutorService scheduled;
    private final Executor immediate;
    
    private BoltsExecutors() {
    background = !isAndroidRuntime()
        ? java.util.concurrent.Executors.newCachedThreadPool()
        : AndroidExecutors.newCachedThreadPool();
    scheduled = Executors.newSingleThreadScheduledExecutor();
    immediate = new ImmediateExecutor();
    }
}
/**
* Nexus 5: Quad-Core
* Moto X: Dual-Core
*
* AsyncTask:
*   CORE_POOL_SIZE = CPU_COUNT + 1
*   MAX_POOL_SIZE = CPU_COUNT * 2 + 1
*
* https://github.com/android/platform_frameworks_base/commit/719c44e03b97e850a46136ba336d729f5fbd1f47
*/
private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();
/* package */ static final int CORE_POOL_SIZE = CPU_COUNT + 1;
/* package */ static final int MAX_POOL_SIZE = CPU_COUNT * 2 + 1;
/* package */ static final long KEEP_ALIVE_TIME = 1L;
public static ExecutorService newCachedThreadPool() {
ThreadPoolExecutor executor =  new ThreadPoolExecutor(
    CORE_POOL_SIZE,
    MAX_POOL_SIZE,
    KEEP_ALIVE_TIME, TimeUnit.SECONDS,
    new LinkedBlockingQueue<Runnable>());

allowCoreThreadTimeout(executor, true);

return executor;
}

自定义

在 Task#call 最后一个参数传递 Executor

1
2
3
public static <TResult> Task<TResult> call(final Callable<TResult> callable, Executor executor) {
return call(callable, executor, null);
}

线程优化

线程池收敛,hook

本文由作者按照 CC BY 4.0 进行授权