Java AQS
AQS
什么是 AQS?
AQS(AbstractQueuedSynchronizer),这可谓是 Doug Lea 老爷子的大作之一。AQS 即是抽象队列同步器,是用来构建 Lock 锁和同步组件的基础框架,很多我们熟知的锁和同步组件都是基于 AQS 构建,它使用了一个 int 成员变量表示同步状态,通过内置的 FIFO 队列来完成资源获取线程的排队工作,比如 ReentrantLock、ReentrantReadWriteLock、CountDownLatch、Semaphore。并发包的大师(Doug Lea)期望它能够成为实现大部分同步需求的基础。
锁的分类
公平锁和非公平锁
公平锁:
公平锁是指多个线程按照申请锁的顺序来获取锁,线程直接进入队列中排队,队列中的第一个线程才能获得锁。公平锁的优点是等待锁的线程不会饿死。缺点是 整体吞吐效率相对非公平锁要低,等待队列中除第一个线程以外的所有线程都会阻塞,CPU 唤醒阻塞线程的开销比非公平锁大。
非公平锁:
非公平锁是多个线程加锁时直接尝试获取锁,获取不到才会到等待队列的队尾等待。但如果此时锁刚好可用,那么这个线程可以无需阻塞直接获取到锁,所以非公平锁有可能出现后申请锁的线程先获取锁的场景。非公平锁的优点是可以减少唤起线程的开销,整体的吞吐效率高,因为线程有几率不阻塞直接获得锁,CPU 不必唤醒所有线程。缺点是处于 等待队列中的线程可能会饿死,或者等很久才会获得锁。
ReentrantLock 代码示例:
可重入锁和不可重入锁
可不可以重入看对 state 的操作,可重入锁 state 会一直累加,不可重入锁只有 1
可重入锁:
可重入锁又名递归锁,是指在同一个线程在外层方法获取锁的时候,再进入该线程的内层方法会自动获取锁(前提锁对象得是同一个对象或者 class),不会因为之前已经获取过还没释放而阻塞。Java 中 ReentrantLock 和 synchronized 都是可重入锁,可重入锁的一个优点是可一定程度避免死锁。
不可重入锁:
同一个线程对同一把锁在释放之前不能获取多次,否则会出现死锁的情况。
示例 ReentrantLock 和 NonReentrantLock
独占锁 (排他锁) 和共享锁
排他锁:
独享锁也叫排他锁,是指该锁一次只能被一个线程所持有。如果线程 T 对数据 A 加上排它锁后,则其他线程不能再对 A 加任何类型的锁。获得排它锁的线程即能读数据又能修改数据。JDK 中的 synchronized 和 JUC 中 Lock 的实现类就是互斥锁。
共享锁:
共享锁是指该锁可被多个线程所持有。如果线程 T 对数据 A 加上共享锁后,则其他线程只能对 A 再加共享锁,不能加排它锁。获得共享锁的线程只能读数据,不能修改数据。
ReentrantReadWriteLock 是共享锁
AQS 的原理?
AQS 核心思想是,如果被请求的共享资源空闲,那么就将当前请求的线程设置为有效的工作线程,将共享资源设置为锁定状态;如果共享资源被占用,就需要一定的阻塞等待唤醒机制来保证锁分配;这个机制主要用的是 CLH 队列的变体实现的,将暂时获取不到锁的线程加入到队列中。
主要原理图:
state:
AQS 使用一个 volatile 的 int 类型的成员变量 (state) 来表示同步状态,通过内置的 FIFO 队列 (CLH) 来完成资源获取的排队工作,通过 CAS 完成对 state 值的修改
AQS 有两种模式
分别是独占模式和共享模式。自定义同步器实现的相关方法也只是为了通过修改 state 字段值来实现多线程的独占模式和共享模式。
一般来说,自定义同步器要么是独占模式,要么是共享模式,它们只需要实现 tryAcquire()-tryRelease()
、tryAcquireShared()-tryReleaseShared()
中的一种即可。
AQS 应用:基于 AQS 的实现?
ReentrantLock 独占锁 (排他锁),可重入锁
ReentrantLock 是基于 AQS 实现的独占锁、可重入锁
ReentrantLock 使用
从 Java 5 开始,引入了一个高级的处理并发的 java.util.concurrent 包,它提供了大量更高级的并发功能,能大大简化多线程程序的编写。
我们知道 Java 语言直接提供了 synchronized 关键字用于加锁,但这种锁一是 很重
,二 是获取时必须一直等待,没有额外的尝试机制
。
- ReentrantLock 是可重入、排他锁,它和 synchronized 一样,一个线程可以多次获取同一个锁
- synchronized 是 Java 语言层面提供的语法,所以我们不需要考虑异常,而 ReentrantLock 是 Java 代码实现的锁,我们就必须先获取锁,然后在 finally 中正确释放锁
- 使用 ReentrantLock.tryLock() 尝试获取锁,线程在 tryLock() 失败的时候不会导致死锁
使用 synchronized 示例:
1
2
3
4
5
6
7
8
9
public class Counter {
private int count;
public void add(int n) {
synchronized(this) {
count += n;
}
}
}
如果用 ReentrantLock 替代,可以把代码改造为:
1
2
3
4
5
6
7
8
9
10
11
12
13
public class Counter {
private final Lock lock = new ReentrantLock();
private int count;
public void add(int n) {
lock.lock();
try {
count += n;
} finally {
lock.unlock();
}
}
}
尝试获取锁:
1
2
3
4
5
6
7
if (lock.tryLock(1, TimeUnit.SECONDS)) {
try {
...
} finally {
lock.unlock();
}
}
上述代码在尝试获取锁的时候,最多等待 1 秒。如果 1 秒后仍未获取到锁,tryLock() 返回 false,程序就可以做一些额外处理,而不是无限等待下去。
示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class TestReentrantLock {
final Lock lock = new ReentrantLock(false);
class Worker extends Thread {
public void run() {
lock.lock();
System.out.println(Thread.currentThread().getName() + "-" + Utils.format());
try {
Utils.second(1);
} finally {
lock.unlock();
}
}
}
public void test() {
// 启动10个子线程
for (int i = 0; i < 100; i++) {
Worker w = new Worker();
//w.setDaemon(true);
w.start();
}
// 主线程每隔1秒换行
for (int i = 0; i < 10; i++) {
Utils.second(1);
//System.out.println();
}
}
public static void main(String[] args) {
TestReentrantLock testMyLock = new TestReentrantLock();
testMyLock.test();
}
}
ReentrantLock 原理
以非公平锁的来看 ReentrantLock 实现原理。
使用 ReentrantLock 前,需要 new 一个实例出来:
1
ReentrantLock lock = new ReentrantLock(false);
其中构造参数 fair:true 代表是公平锁,false 代表的是非公平锁,默认是非公平锁,来看看源码:
1
2
3
4
5
6
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
现在来看下 NonfairSync
和 FairSync
,两个都继承 Sync
,先看看 Sync。
Sync 是继承自 AbstractQueuedSynchronizer
,
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
// ReentrantLock
abstract static class Sync extends AbstractQueuedSynchronizer {
abstract void lock(); // 获取锁,子类重写
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread(); // 获取当前线程实例
int c = getState(); // 获取state
if (c == 0) { // c等于0,表示没有线程获取到锁
if (compareAndSetState(0, acquires)) { // 尝试获取锁,并设置state为acquires
setExclusiveOwnerThread(current); // 设置当前线程
return true;
}
}
else if (current == getExclusiveOwnerThread()) { // 已经有线程获取到锁了,那么检查下是不是当前线程,如果是,进来
int nextc = c + acquires; // 再次state状态+acquires
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc); // 更新state为新的状态
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
protected final boolean isHeldExclusively() {
return getExclusiveOwnerThread() == Thread.currentThread();
}
final ConditionObject newCondition() {
return new ConditionObject();
}
final Thread getOwner() {
return getState() == 0 ? null : getExclusiveOwnerThread();
}
final int getHoldCount() {
return isHeldExclusively() ? getState() : 0;
}
final boolean isLocked() {
return getState() != 0;
}
}
lock 获取锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
// ReentrantLock#Sync
abstract static class Sync extends AbstractQueuedSynchronizer {
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
// 非公平锁ReentrantLock#NonfairSync.java
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
final void lock() {
if (compareAndSetState(0, 1)) // 1
setExclusiveOwnerThread(Thread.currentThread()); // 2
else
acquire(1); // 3
}
protected final boolean tryAcquire(int acquires) { // 4
return nonfairTryAcquire(acquires);
}
}
// 公平锁ReentrantLock#NonfairSync.java
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
// AbstractQueuedSynchronizer.java
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
- 我们通过 ReentrantLock 调用 lock() 来尝试获取锁,非公平锁是通过 CAS 将 state 从 0 改成 1,如果成功就进入到 2;
- 调用 setExclusiveOwnerThread 设置锁为独占模式,保留当前线程
- 非公平锁中如果 state 从 0 改为 1 失败后,就会调用 acquire(),acquire() 又会调用 tryAcquire() 方法;公平锁是直接调用
acquire(1)
- 非公平锁 NonfairSync 的 tryAcquire 调用的是 nonfairTryAcquire();公平锁是调用
FairSync#tryAcquire()
,两者的一个区别就是公平锁多了!hasQueuedPredecessors()
判断逻辑
线程加入等待队列
加入队列时机
当执行 Acquire(1) 时,会通过 tryAcquire 获取锁。在这种情况下,如果获取锁失败,就会调用 addWaiter 加入到等待队列中去。
如何加入队列
获取锁失败 (即 tryAcquire()
返回了 false) 后,会执行 addWaiter(Node.EXCLUSIVE)
加入等待队列,具体实现方法如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// java.util.concurrent.locks.AbstractQueuedSynchronizer
private Node addWaiter(Node mode) {
Node node = new Node(mode);
for (;;) {
Node oldTail = tail;
if (oldTail != null) {
U.putObject(node, Node.PREV, oldTail);
if (compareAndSetTail(oldTail, node)) {
oldTail.next = node;
return node;
}
} else {
initializeSyncQueue();
}
}
}
private final boolean compareAndSetTail(Node expect, Node update) {
return U.compareAndSwapObject(this, TAIL, expect, update);
}
新来的线程会排到队列后面
等待队列中线程出队列时机
addWaiter 方法,这个方法其实就是把对应的线程以 Node 的数据结构形式加入到双端队列里,返回的是一个包含该线程的 Node。而这个 Node 会作为参数,进入到 acquireQueued 方法中。acquireQueued 方法可以对排队中的线程进行 “ 获锁 “ 操作。
一个线程获取锁失败了,被放入等待队列,acquireQueued 会把放入队列中的线程不断去获取锁,直到获取成功或者不再需要获取(中断)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// // java.util.concurrent.locks.AbstractQueuedSynchronizer
final boolean acquireQueued(final Node node, int arg) {
try {
// 标记等待过程中是否中断过
boolean interrupted = false;
for (;;) { // 开始自旋,要么获取锁,要么中断
// 获取当前节点的前驱节点
final Node p = node.predecessor();
// 如果p是头结点,说明当前节点在真实数据队列的首部,就尝试获取锁(别忘了头结点是虚节点)
if (p == head && tryAcquire(arg)) {
// 获取锁成功,头指针移动到当前node
setHead(node);
p.next = null; // help GC
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} catch (Throwable t) {
cancelAcquire(node);
throw t;
}
}
- shouldParkAfterFailedAcquire
- parkAndCheckInterrupt 主要用于挂起当前线程,阻塞调用栈,返回当前线程的中断状态。
释放锁
1
2
3
4
// java.util.concurrent.locks.ReentrantLock
public void unlock() {
sync.release(1);
}
ReentrantLock 公平锁和非公平锁核心源码
公平锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
非公平锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
ReentrantReadWriteLock 共享锁
synchronized 和 ReentrantLock 都是独占锁,这些锁在同一时刻只允许一个线程进行访问,而读写锁在同一时刻可以允许多个读线程访问,但在写线程访问时,所有的读线程和其他写线程均被阻塞。
ReentrantReadWriteLock 维护了一对锁,一个读锁和一个写锁,通过分离读锁和写锁,使得并发性相比一般的独占锁有了很大提升。
一棒情况下,读写锁的性能都会比独占锁好,因为大多数场景读是对于写的,在读多余写的情况下,读写锁能够提供比独占锁更好的并发性和吞吐量。
CountDownLatch 让一个线程等待多个线程同时运行完成后
CountDownLatch 介绍
CountDownLatch 是一个同步工具类,用来协调多个线程之间的同步,或者说起到线程之间的通信(而不是用作互斥的作用)。
CountDownLatch 能够使一个线程在等待另外一些线程完成各自工作之后,再继续执行。使用一个计数器进行实现。计数器初始值为线程的数量。当每一个线程完成自己任务后,计数器的值就会减一。当计数器的值为 0 时,表示所有的线程都已经完成一些任务,然后在 CountDownLatch 上等待的线程就可以恢复执行接下来的任务。
CountDownLatch 在多线程并发编程中充当⼀个计时器的功能,它维护了⼀个 count 的变量,并且其操作都是原⼦操作,该类主要通过 countDown() 和 await() 两个⽅法来实现功能的:⾸先通过建⽴ CountDownLatch 对象,传⼊的参数即为 count 初始值。如果⼀个线程调⽤了 await() ⽅法,那么这个线程便进⼊阻塞状态并同时进⼊阻塞队列。如果⼀个线程调⽤了 countDown() ⽅法,则会使 count-1,当 count 的值为 0 时,这时候阻塞队列中调⽤ await() ⽅法的线程便会逐个被唤醒并出队,从⽽进⼊后续的操作。
CountDownLatch 使用
方法
countDown
- public void countDown()
递减锁存器的计数,如果计数到达零,则释放所有等待的线程。如果当前计数大于零,则将计数减少
wait
1
2
3
4
public boolean await(long timeout,TimeUnit unit) throws InterruptedException
参数:timeout-要等待的最长时间 unit-timeout 参数的时间单位
返回值:如果计数到达零,则返回true;如果在计数到达零之前超过了等待时间,则返回false
抛出:InterruptedException-如果当前线程在等待时被中断
使当前线程在锁存器倒计数至零之前一直等待,除非线程被中断或超出了指定的等待时间。如果当前计数为零,则此方法立刻返回 true 值。如果当前计数大于零,则出于线程调度目的,将禁用当前线程,且在发生以下三种情况之一前,该线程将一直出于休眠状态:
1. 由于调用 countDown() 方法,计数到达零,则该方法返回 true 值。
2. 如果当前线程,在进入此方法时已经设置了该线程的中断状态;或者在等待时被中断,则抛出 InterruptedException,并且清除当前线程的已中断状态。
3. 如果超出了指定的等待时间,则返回值为 false。如果该时间小于等于零,则该方法根本不会等待
使用场景
- 某一线程在开始运行前等待 n 个线程执行完毕。将 CountDownLatch 的计数器初始化为 new CountDownLatch(n),每当一个任务线程执行完毕,就将计数器减 1 countdownLatch.countDown(),当计数器的值变为 0 时,在 CountDownLatch 上 await() 的线程就会被唤醒。一个典型应用场景就是启动一个服务时,主线程需要等待多个组件加载完毕,之后再继续执行。
- 实现多个线程开始执行任务的最大并行性。注意是并行性,不是并发,强调的是多个线程在某一时刻同时开始执行。类似于赛跑,将多个线程放到起点,等待发令枪响,然后同时开跑。做法是初始化一个共享的 CountDownLatch(1),将其计算器初始化为 1,多个线程在开始执行任务前首先 countdownlatch.await(),当主线程调用 countDown() 时,计数器变为 0,多个线程同时被唤醒。
案例
- 主线程等待子线程执行完成在执行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/**
* 主线程等待子线程执行完成再执行
*/
public class CountdownLatchTest1 {
public static void main(String[] args) {
final CountDownLatch latch = new CountDownLatch(3);
ExecutorService service = Executors.newFixedThreadPool(3);
for (int i = 0; i < 3; i++) {
Runnable runnable = new Runnable() {
@Override
public void run() {
try {
System.out.println("--->>>子线程" + Thread.currentThread().getName() + "开始执行");
Thread.sleep((long) (Math.random() * 10000));
System.out.println("--->>>子线程" + Thread.currentThread().getName() + "执行完成");
latch.countDown();//当前线程调用此方法,则计数减一
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
service.execute(runnable);
}
try {
System.out.println("主线程" + Thread.currentThread().getName() + "等待子线程执行完成...");
latch.await();//阻塞当前线程,直到计数器的值为0
System.out.println("主线程" + Thread.currentThread().getName() + "开始执行...");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
输出:
1
2
3
4
5
6
7
8
--->>>子线程pool-1-thread-2开始执行
主线程main等待子线程执行完成...
--->>>子线程pool-1-thread-1开始执行
--->>>子线程pool-1-thread-3开始执行
--->>>子线程pool-1-thread-2执行完成
--->>>子线程pool-1-thread-1执行完成
--->>>子线程pool-1-thread-3执行完成
主线程main开始执行...
- 百米赛跑,4 名运动员选手到达场地等待裁判口令,裁判一声口令,选手听到后同时起跑,当所有选手到达终点,裁判进行汇总排名
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
/**
* 百米赛跑,4名运动员选手到达场地等待裁判口令,裁判一声口令,选手听到后同时起跑,当所有选手到达终点,裁判进行汇总排名 <br/>
*/
public class CountdownLatchTest2 {
public static void main(String[] args) {
ExecutorService service = Executors.newCachedThreadPool();
final CountDownLatch cdOrder = new CountDownLatch(1);
final CountDownLatch cdAnswer = new CountDownLatch(4);
for (int i = 0; i < 4; i++) {
Runnable runnable = new Runnable() {
@Override
public void run() {
try {
System.out.println("-->>选手" + Thread.currentThread().getName() + "正在等待裁判发布口令");
cdOrder.await();
System.out.println("-->>选手" + Thread.currentThread().getName() + "已接受裁判口令");
Thread.sleep((long) (Math.random() * 10000));
System.out.println("-->>选手" + Thread.currentThread().getName() + "到达终点");
cdAnswer.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
service.execute(runnable);
}
try {
Thread.sleep((long) (Math.random() * 10000));
System.out.println("裁判" + Thread.currentThread().getName() + "即将发布口令");
cdOrder.countDown();
System.out.println("裁判" + Thread.currentThread().getName() + "已发送口令,正在等待所有选手到达终点");
cdAnswer.await();
System.out.println("所有选手都到达终点");
System.out.println("裁判" + Thread.currentThread().getName() + "汇总成绩排名");
} catch (InterruptedException e) {
e.printStackTrace();
}
service.shutdown();
}
}
输出:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
-->>选手pool-1-thread-3正在等待裁判发布口令
-->>选手pool-1-thread-1正在等待裁判发布口令
-->>选手pool-1-thread-4正在等待裁判发布口令
-->>选手pool-1-thread-2正在等待裁判发布口令
裁判main即将发布口令
裁判main已发送口令,正在等待所有选手到达终点
-->>选手pool-1-thread-3已接受裁判口令
-->>选手pool-1-thread-1已接受裁判口令
-->>选手pool-1-thread-2已接受裁判口令
-->>选手pool-1-thread-4已接受裁判口令
-->>选手pool-1-thread-2到达终点
-->>选手pool-1-thread-4到达终点
-->>选手pool-1-thread-3到达终点
-->>选手pool-1-thread-1到达终点
所有选手都到达终点
裁判main汇总成绩排名
CancelableCountDownLatch – from ARouter
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class CancelableCountDownLatch extends CountDownLatch {
/**
* Constructs a {@code CountDownLatch} initialized with the given count.
*
* @param count the number of times {@link #countDown} must be invoked
* before threads can pass through {@link #await}
* @throws IllegalArgumentException if {@code count} is negative
*/
public CancelableCountDownLatch(int count) {
super(count);
}
public void cancel() {
while (getCount() > 0) {
countDown();
}
}
}
Condition
什么是 Condition?
Condition 是在 java 1.5 中才出现的,它用来替代传统的 Object 的 wait()、notify() 实现线程间的协作,相比使用 Object 的 wait()、notify(),使用 Condition 的 await()、signal() 这种方式实现线程间协作更加安全和高效。因此通常来说比较推荐使用 Condition,阻塞队列实际上是使用了 Condition 来模拟线程间协作。
- Condition 是个接口,基本的方法就是 await() 和 signal() 方法;
- Condition 依赖于 Lock 接口,生成一个 Condition 的基本代码是 lock.newCondition()
- 调用 Condition 的 await() 和 signal() 方法,都必须在 lock 保护之内,就是说必须在 lock.lock() 和 lock.unlock 之间才可以使用 Condition 中的 await() 对应 Object 的 wait()(同 Object Monitor Methods 的 wait() 和 notify()、notifyAll() 方法必须在 synchronized{}代码块中调用一样);
- Condition 中的 signal() 对应 Object 的 notify();
- Condition 中的 signalAll() 对应 Object 的 notifyAll()。
和 Object Monitor Methods 对比
Condition 使用
常用方法
await() 造成当前线程在接到信号或被中断之前一直处于等待状态
await(long time, TimeUnit unit) 造成当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态
awaitNanos(long nanosTimeout) 造成当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。返回值表示剩余时间,如果在 nanosTimesout 之前唤醒,那么返回值 = nanosTimeout - 消耗时间,如果返回值 <= 0 ,则可以认定它已经超时了。
awaitUninterruptibly() 造成当前线程在接到信号之前一直处于等待状态。【注意:该方法对中断不敏感】。
awaitUntil(Date deadline) 造成当前线程在接到信号、被中断或到达指定最后期限之前一直处于等待状态。如果没有到指定时间就被通知,则返回 true,否则表示到了指定时间,返回返回 false。
signal() 同 notify();唤醒一个等待线程。该线程从等待方法返回前必须获得与 Condition 相关的锁
signalAll() 同 notifyAll();唤醒所有等待线程。能够从等待方法返回的线程必须获得与 Condition 相关的锁
案例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public class ConditionUseCase {
public Lock lock = new ReentrantLock();
public Condition condition = lock.newCondition();
public static void main(String[] args) {
ConditionUseCase useCase = new ConditionUseCase();
ExecutorService executorService = Executors.newFixedThreadPool (2);
executorService.execute(new Runnable() {
@Override
public void run() {
useCase.conditionWait();
}
});
executorService.execute(new Runnable() {
@Override
public void run() {
useCase.conditionSignal();
}
});
}
public void conditionWait() {
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + "拿到锁了");
System.out.println(Thread.currentThread().getName() + "等待信号");
condition.await();
System.out.println(Thread.currentThread().getName() + "拿到信号");
} catch (Exception e){
} finally {
lock.unlock();
}
}
public void conditionSignal() {
lock.lock();
try {
Thread.sleep(5000);
System.out.println(Thread.currentThread().getName() + "拿到锁了");
condition.signal();
System.out.println(Thread.currentThread().getName() + "发出信号");
} catch (Exception e){
} finally {
lock.unlock();
}
}
}
Condition 原理分析
等待队列
Condition 是 AQS 的内部类。等待队列是一个 FIFO 的队列,在队列中的每个节点都包含了一个线程引用,该线程就是在 Condition 对象上等待的线程,如果一个线程调用了 Condition.await() 方法,那么该线程将会释放锁、构造成节点加入等待队列并进入等待状态
一个 Condition 包含一个等待队列,Condition 拥有首节点(firstWaiter)和尾节点(lastWaiter)。当前线程调用 Condition.await() 方法,将会以当前线程构造节点,并将节点从尾部加入等待队列,等待队列的基本结构如下图所示:
如图所示,Condition 拥有首尾节点的引用,而新增节点只需要将原有的尾节点 nextWaiter 指向它,并且更新尾节点即可。上述节点引用更新的过程并没有使用 CAS 保证,原因在于调用 await() 方法的线程必定是获取了锁的线程,也就是说该过程是由锁来保证线程安全的。在 Object 的监视器模型上,一个对象拥有一个同步队列和等待队列,而并发包中的 Lock(更确切地说是同步器)拥有一个同步队列和多个等待队列,其对应关系如下图所示:
等待
调用 Condition 的 await() 方法(或者以 await 开头的方法),会使当前线程进入等待队列并释放锁,同时线程状态变为等待状态。当从 await() 方法返回时,当前线程一定获取了 Condition 相关联的锁。如果从队列(同步队列和等待队列)的角度看 await() 方法,当调用 await() 方法时,相当于同步队列的首节点(获取了锁的节点)移动到 Condition 的等待队列中
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 当前线程加入等待队列
Node node = addConditionWaiter();
// 释放同步状态,也就是释放锁
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
调用该方法的线程成功获取了锁的线程,也就是同步队列中的首节点,该方法会将当前线程构造成节点并加入等待队列中,然后释放同步状态,唤醒同步队列中的后继节点,然后当前线程会进入等待状态。当等待队列中的节点被唤醒,则唤醒节点的线程开始尝试获取同步状态。如果不是通过其他线程调用 Condition.signal() 方法唤醒,而是对等待线程进行中断,则会抛出 InterruptedException
通知
调用 Condition 的 signal() 方法,将会唤醒在等待队列中等待时间最长的节点(首节点),在唤醒节点之前,会将节点移到同步队列中:
1
2
3
4
5
6
7
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
调用该方法的前置条件是当前线程必须获取了锁,可以看到 signal() 方法进行了 isHeldExclusively() 检查,也就是当前线程必须是获取了锁的线程。接着获取等待队列的首节点,将其移动到同步队列并使用 LockSupport 唤醒节点中的线程
节点从等待队列移动到同步队列的过程如下图所示:
通过调用同步器的 enq(Node node) 方法,等待队列中的头节点线程安全地移动到同步队列。当节点移动到同步队列后,当前线程再使用 LockSupport 唤醒该节点的线程。被唤醒后的线程,将从 await() 方法中的 while 循环中退出(isOnSyncQueue(Node node) 方法返回 true,节点已经在同步队列中),进而调用同步器的 acquireQueued() 方法加入到获取同步状态的竞争中。成功获取同步状态(或者说锁)之后,被唤醒的线程将从先前调用的 await() 方法返回,此时该线程已经成功地获取了锁。
Condition 的 signalAll() 方法,相当于对等待队列中的每个节点均执行一次 signal() 方法(注意是这个 Condition 对应的等待队列),效果就是将等待队列中所有节点全部移动到同步队列中,并唤醒每个节点的线程。
ThreadPoolExecutor#Worker
是一把不可重入的锁
如何用 AQS 实现可重入锁?
ReentrantLock 的可重入性是 AQS 很好的应用之一,分公平锁和非公平锁
公平锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
// java.util.concurrent.locks.ReentrantLock.FairSync#tryAcquire
if (c == 0) {
if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
非公平锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
// java.util.concurrent.locks.ReentrantLock.Sync#nonfairTryAcquire
if (c == 0) {
if (compareAndSetState(0, acquires)){
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
通过同步状态 state 来控制整体可重入的情况,state 是 volatile 修饰的,保证了可见性和有序性。
- state 初始化的时候为 0,表示没有任何线程持有锁
- 当有线程持有锁时,state=state+1,通过一个线程多次获得锁会多加次 1,这就是可重入的概念
- 解锁也是对这个字段 -1,一直减到 0,此线程对锁释放
如何用 AQS 实现不可重入锁?
JDK 已有:线程池的 Worker 就是一个 AQS,实现了一个不可重入锁,state 只能为 1 和 0,1 表示获得锁,0 表示未获得锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
}
自定义同步器实现不可重入锁:state 只能为 0 或 1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Override
protected boolean tryAcquire(int arg) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
/*释放锁*/
@Override
protected boolean tryRelease(int arg) {
if (getState() == 0) {
throw new IllegalMonitorStateException();
}
setExclusiveOwnerThread(null);
setState(0);
return true;
}
AQS 面试题
某个线程获取锁失败的后续流程是什么呢?
存在某种排队等候机制,线程继续等待,仍然保留获取锁的可能,获取锁流程仍在继续。
某种排队等候机制,是 CLH 变体的 FIFO 双端队列。在 AQS 中是 acquire() 方法,失败后加入到 CLH 队列中去,并自旋获取锁
如果处于排队等候机制中的线程一直无法获取锁,需要一直等待么?还是有别的策略来解决这一问题?
线程所在节点的状态会变成取消状态,取消状态的节点会从队列中释放