1. 锁原理 - 信号量 vs 管程
在并发编程领域,有两大核心问题:互斥与同步,互斥即同一时刻只允许一个线程访问共享资源,同步,即线程之间如何通信、协作,一般这两大问题可以通过信号量和管程来解决。
1.1. 信号量
信号量(Semaphore)是操作系统提供的一种进程间常见的通信方式,主要用来协调并发程序对共享资源的访问,操作系统可以保证对信号量操作的原子性。它是怎么实现的呢。
- 信号量由一个共享整型变量 S 和两个原子操作 PV 组成,S 只能通过 P 和 V 操作来改变
- P 操作:即请求资源,意味着 S 要减 1,如果 S < 0, 则表示没有资源了,此时线程要进入等待队列(同步队列)等待
- V 操作: 即释放资源,意味着 S 要加 1, 如果 S 小于等于 0,说明等待队列里有线程,此时就需要唤醒线程。
信号量机制的引入解决了进程同步和互斥问题,但信号量的大量同步操作分散在各个进程中不便于管理,还有可能导致系统死锁。如:生产者消费者问题中将P、V颠倒可能死锁,另外条件越多,需要的信号量就越多,需要更加谨慎地处理信号量之间的处理顺序,否则很容易造成死锁现象。
基于信号量给编程带来的隐患,于是有了提出了对开发者更加友好的并发编程模型-管程
1.2. 管程
Dijkstra 于 1971 年提出:把所有进程对某一种临界资源的同步操作都集中起来,构成一个所谓的秘书进程。凡要访问该临界资源的进程,都需先报告秘书,由秘书来实现诸进程对同一临界资源的互斥使用,这种机制就是管程。
管程是一种在信号量机制上进行改进的并发编程模型,解决了信号量在临界区的 PV 操作上配对的麻烦,把配对的 PV 操作集中在一起而形成的并发编程方法理论,极大降低了使用和理解成本。
管程由四部分组成:
- 管程内部的共享变量。
- 管程内部的条件变量。
- 管程内部并行执行的进程。
- 对于局部与管程内部的共享数据设置初始值的语句。
由此可见,管程就是一个对象监视器。任何线程想要访问该资源(共享变量),就要排队进入监控范围。进入之后,接受检查,不符合条件,则要继续等待,直到被通知,然后继续进入监视器。
需要注意的事,信号量和管程两者是等价的,信号量可以实现管程,管程也可以实现信号量,只是两者的表现形式不同而已,管程对开发者更加友好。
两者的区别如下
管程为了解决信号量在临界区的 PV 操作上的配对的麻烦,把配对的 PV 操作集中在一起,并且加入了条件变量的概念,使得在多条件下线程间的同步实现变得更加简单。
怎么理解管程中的入口等待队列,共享变量,条件变量等概念,有时候技术上的概念较难理解,我们可以借助生活中的场景来帮助我们理解,就以我们的就医场景为例来简单说明一下,正常的就医流程如下:
- 病人去挂号后,去侯诊室等待叫号
- 叫到自己时,就可以进入就诊室就诊了
- 就诊时,有两种情况,一种是医生很快就确定病人的病,并作出诊断,诊断完成后,就通知下一位病人进来就诊,一种是医生无法确定病因,需要病人去做个验血 / CT 检查才能确定病情,于是病人就先去验个血 / CT
- 病人验完血 / 做完 CT 后,重新取号,等待叫号(进入入口等待队列)
- 病人等到自己的号,病人又重新拿着验血 / CT 报告去找医生就诊
整个流程如下
那么管程是如何解决互斥和同步的呢
首先来看互斥,上文中医生即共享资源(也即共享变量),就诊室即为临界区,病人即线程,任何病人如果想要访问临界区,必须首先获取共享资源(即医生),入口一次只允许一个线程经过,在共享资源被占有的情况下,如果再有线程想占有共享资源,就需要到等待队列去等候,等到获取共享资源的线程释放资源后,等待队列中的线程就可以去竞争共享资源了,这样就解决了互斥问题,所以本质上管程是通过将共享资源及其对共享资源的操作(线程安全地获取和释放)封装起来来保证互斥性的。
再来看同步,同步是通过文中的条件变量及其等待队列实现的,同步的实现分两种情况
- 病人进入就诊室后,无需做验血 / CT 等操作,于是医生诊断完成后,就会释放共享资源(解锁)去通知(notify,notifyAll)入口等待队列的下一个病人,下一个病人听到叫号后就能看医生了。
- 如果病人进入就诊室后需要做验血 / CT 等操作,会去验血 / CT 队列(条件队列)排队, 同时释放共享变量(医生),通知入口等待队列的其他病人(线程)去获取共享变量(医生),获得许可的线程执行完临界区的逻辑后会唤醒条件变量等待队列中的线程,将它放到入口等待队列中 ,等到其获取共享变量(医生)时,即可进入入口(临界区)处理。
在 Java 里,锁大多是依赖于管程来实现的,以大家熟悉的内置锁 synchronized 为例,它的实现原理如下。
可以看到 synchronized 锁也是基于管程实现的,只不过它只有且只有一个条件变量(就是锁对象本身)而已,这也是为什么JDK 要实现 Lock 锁的原因之一,Lock 支持多个条件变量。
2. AQS
AQS全称为AbstractQueuedSynchronizer,它提供了一个FIFO队列,可以看成是一个用来实现同步锁以及其他涉及到同步功能的核心组件,常见的有:ReentrantLock、CountDownLatch等。
AQS是一个抽象类,主要是通过继承的方式来使用,它本身没有实现任何的同步接口,仅仅是定义了同步状态的获取以及释放的方法来提供自定义的同步组件。
从使用层面来说,AQS的功能分为两种:独占和共享
- 独占锁,每次只能有一个线程持有锁,比如前面给大家演示的ReentrantLock就是以独占方式实现的互斥锁
- 共享锁,允许多个线程同时获取锁,并发访问共享资源,比如ReentrantReadWriteLock
2.1. AQS的内部实现
AQS核心思想是,如果被请求的共享资源空闲,那么就将当前请求资源的线程设置为有效的工作线程,将共享资源设置为锁定状态;如果共享资源被占用,就需要一定的阻塞等待唤醒机制来保证锁分配。这个机制主要用的是CLH队列的变体实现的,将暂时获取不到锁的线程加入到队列中。
CLH:Craig、Landin and Hagersten队列,是单向链表,AQS中的队列是CLH变体的虚拟双向队列(FIFO),AQS是通过将每条请求共享资源的线程封装成一个节点来实现锁的分配。
AQS使用一个Volatile的int类型的成员变量来表示同步状态,通过内置的FIFO队列来完成资源获取的排队工作,通过CAS完成对State值的修改。
以实现独占锁为例(即当前资源只能被一个线程占有),其实现原理如下:state 初始化 0,在多线程条件下,线程要执行临界区的代码,必须首先获取 state,某个线程获取成功之后, state 加 1,其他线程再获取的话由于共享资源已被占用,所以会到 FIFO 等待队列去等待,等占有 state 的线程执行完临界区的代码释放资源( state 减 1)后,会唤醒 FIFO 中的下一个等待线程(head 中的下一个结点)去获取 state。
state 由于是多线程共享变量,所以必须定义成 volatile,以保证 state 的可见性, 同时虽然 volatile 能保证可见性,但不能保证原子性,所以 AQS 提供了对 state 的原子操作方法,保证了线程安全。
如果当前线程竞争锁失败,那么AQS会把当前线程以及等待状态信息构造成一个Node加入到同步队列中,同时再阻塞该线程。当获取锁的线程释放锁以后,会从队列中唤醒一个阻塞的节点(线程)。
AQS 中实现的 FIFO 队列(CLH 队列)其实是双向链表实现的,由 head, tail 节点表示,head 结点代表当前占用的线程,其他节点由于暂时获取不到锁所以依次排队等待锁释放
我们再看下AQS的另外几个核心属性
/**
* 双向链表的首尾结点,代表入口等待队列
*/
private transient volatile Node head;
private transient volatile Node tail;
/**
* 同步状态值(锁的数量)
* 当state=0时,表示无锁状态
* 当state>0时,表示已经有线程获得了锁,也就是state=1,但是因为ReentrantLock允许重入,所以同一个线程多次获得同步锁的时候,state会递增,比如重入5次,那么state=5。 而在释放锁的时候,同样需要释放5次直到state=0其他线程才有资格获得锁
*/
private volatile int state;
/**
* 继承自父类AbstractOwnableSynchronizer的属性,
* The current owner of exclusive mode synchronization.
*/
private transient Thread exclusiveOwnerThread;
AQS中维护了一个名为state的字段,意为同步状态,是由Volatile修饰的,用于展示当前临界资源的获锁情况。我们可以通过修改State字段表示的同步状态来实现多线程的独占模式和共享模式(加锁过程)。
除此之外,AQS内部还定义了一个静态类Node,表示CLH队列的每一个结点,该结点的作用是对每一个等待获取资源做了封装,包含了需要同步的线程本身、线程等待状态。
static final class Node {
/**SHARED、EXCLUSIVE用于设置nextWaiter,用于表示当前节点是共享的,还是互斥的,分别用于共享锁和独占锁 */
/** 表示线程以共享的模式等待锁 **/
static final Node SHARED = new Node();
/** 表示线程正在以独占的方式等待锁 **/
static final Node EXCLUSIVE = null;
/** int类型的常量CANCELLED、SIGNAL、CONDITION、PROPAGATE用于设置waitStatus,用于在ConditionObject中使用,可以实现await/signal模型。 **/
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
/**
* SIGNAL(-1) :线程的后继线程正/已被阻塞,当该线程release或cancel时要唤醒这个后继线程(unpark)
* CANCELLED(1):因为超时或中断,该线程已经被取消
* CONDITION(-2):表明该线程被处于条件队列,就是因为调用了>- Condition.await而被阻塞
* PROPAGATE(-3):这个值是在共享锁的时候会用到,唤醒了一个节点,会尝试唤醒下一个节点,如果当前节点未阻塞(阻塞前就获得了锁),那么当前节点的状态会被设置成-3, 当前线程处在SHARED情况下
* 0:0代表无状态
*/
volatile int waitStatus;
// 前驱节点
volatile Node prev;
// 后继节点
volatile Node next;
// 等待的线程
volatile Thread thread;
// 存储在condition队列中的后继节点
Node nextWaiter;
final boolean isShared() {
return nextWaiter == SHARED;
}
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
// 将线程构造成一个Node,添加到等待队列
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
// 这个方法会在Condition队列使用
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
代码里面定义了一个表示当前Node结点等待状态的字段waitStatus
,该字段的取值包含了CANCELLED(1)、SIGNAL(-1)、CONDITION(-2)、PROPAGATE(-3)、0,这五个值代表了不同的特定场景:
- CANCELLED:表示当前结点已取消调度。当timeout或被中断(响应中断的情况下),会触发变更为此状态,进入该状态后的结点将不会再变化。
- SIGNAL:表示后继结点在等待当前结点唤醒。后继结点入队时,会将前继结点的状态更新为SIGNAL(记住这个-1的值,因为后面我们讲的时候经常会提到)
- CONDITION:表示结点等待在Condition上,当其他线程调用了Condition的signal()方法后,CONDITION状态的结点将从等待队列转移到同步队列中,等待获取同步锁。(注:Condition是AQS的一个组件,后面会细说)
- PROPAGATE:共享模式下,前继结点不仅会唤醒其后继结点,同时也可能会唤醒后继的后继结点。
- 0:新结点入队时的默认状态。
也就是说,当waitStatus为负值表示结点处于有效等待状态,为正值的时候表示结点已被取消。
2.2. 释放锁以及添加线程对于队列的变化
添加节点
当出现锁竞争以及释放锁的时候,AQS同步队列中的节点会发生变化,首先看一下添加节点的场景。
这里会涉及到两个变化
- 新的线程封装成Node节点追加到同步队列中,设置prev节点以及修改当前节点的前置节点的next节点指向自己
- 通过CAS将tail重新指向新的尾部节点
释放锁移除节点
head节点表示获取锁成功的节点,当头结点在释放同步状态时,会唤醒后继节点,如果后继节点获得锁成功,会把自己设置为头结点,节点的变化过程如下
这个过程也是涉及到两个变化
- 修改head节点指向下一个获得锁的节点
- 新的获得锁的节点,将prev的指针指向null
这里有一个小的变化,就是设置head节点不需要用CAS,原因是设置head节点是由获得锁的线程来完成的,而同步锁只能由一个线程获得,所以不需要CAS保证,只需要把head节点设置为原首节点的后继节点,并且断开原head节点的next引用即可
接下来,我们通过ReentrantLock的加锁和解锁流程,来看看线程是如何加入等待队列的,以及队列中每个节点的状态值是如何变化的。
3. ReentrantLock
以ReentrantLock为例,来分析AQS在重入锁中的使用。毕竟单纯分析AQS没有太多的含义。先理解这个类图,可以方便我们理解AQS的原理
我们先初略看下ReentrantLock的核心结构
public class ReentrantLock implements Lock, java.io.Serializable {
//继承自AQS
abstract static class Sync extends AbstractQueuedSynchronizer {
//.....
}
static final class NonfairSync extends Sync {
// ..... 非公平锁
}
static final class FairSync extends Sync {
// ..... 公平锁
}
}
sync是一个静态内部类,它继承了AQS这个抽象类,前面说过AQS是一个同步工具,主要用来实现同步控制。我们在利用这个工具的时候,会继承它来实现同步控制功能。
通过进一步分析,发现Sync这个类有两个具体的实现,分别是NofairSync(非公平锁)
,FailSync(公平锁)
.
- 公平锁 表示所有线程严格按照FIFO来获取锁
- 非公平锁 表示可以存在抢占锁的功能,也就是说不管当前队列上是否存在其他线程等待,新线程都有机会抢占锁
3.1. ReentrantLock.lock()
public void lock() {
sync.lock();
}
接下来的分析仍然以非公平锁作为主要分析逻辑。
final void lock() {
// 通过cas操作来修改state状态,表示争抢锁的操作
if (compareAndSetState(0, 1))
// 设置当前获得锁状态的线程
setExclusiveOwnerThread(Thread.currentThread());
else
// 尝试去获取锁
acquire(1);
}
- 由于这里是非公平锁,所以调用lock方法时,先去通过cas去抢占锁,期望是0,表示没有线程获得锁
- 如果抢占锁成功,保存获得锁成功的当前线程
setExclusiveOwnerThread(Thread.currentThread());
- 如果 CAS 设置 state 为 1 失败(代表获取锁失败),调用AQS的acquire来走锁竞争逻辑
我们假定有两个线程A和B同时竞争锁,A进来先抢占到锁,此时的AQS模型图就类似这样:
compareAndSetState(0, 1)
- 当state=0时,表示无锁状态
- 当state>0时,表示已经有线程获得了锁,也就是state=1,但是因为ReentrantLock允许重入,所以同一个线程多次获得同步锁的时候,state会递增,比如重入5次,那么state=5。 而在释放锁的时候,同样需要释放5次直到state=0其他线程才有资格获得锁
acquire(1)
acquire是AQS中的方法,如果CAS操作未能成功,说明state已经不为0,此时继续acquire(1)操作
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
这个方法的主要逻辑是
- 通过tryAcquire尝试获取独占锁,如果成功返回true,失败返回false
- 如果tryAcquire失败,则会通过addWaiter方法将当前线程封装成Node添加到AQS队列尾部,并标记为独占模式
- acquireQueued,将Node作为参数,通过自旋去尝试获取锁。
- selfInterrupt:自我中断,就是既拿不到锁,又在等待时被中断了,线程就会进行自我中断selfInterrupt(),将中断补上。
我们一个个来看源码,并结合上面的两个线程来做场景分析。
NonfairSync.tryAcquire
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
这个方法的作用是尝试获取锁,如果成功返回true,不成功返回false。它是重写AQS类中的tryAcquire方法
nonfairTryAcquire(acquires)
final boolean nonfairTryAcquire(int acquires) {
// 获得当前执行的线程
final Thread current = Thread.currentThread();
// 获得state的值
int c = getState();
// state=0说明当前是无锁状态
if (c == 0) {
// 通过cas操作来替换state的值改为1
if (compareAndSetState(0, acquires)) {
// 保存当前获得锁的线程
setExclusiveOwnerThread(current);
return true;
}
}
// 如果是同一个线程来获得锁,则直接增加重入次数
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
此段代码可知锁的获取主要分两种情况
- state 为 0 时,代表锁已经被释放,可以去获取,于是使用 CAS 去重新获取锁资源,如果获取成功,则代表竞争锁成功,使用 setExclusiveOwnerThread(current) 记录下此时占有锁的线程,看到这里的 CAS,大家应该不难理解为啥当前实现是非公平锁了,因为队列中的线程与新线程都可以 CAS 获取锁啊,新来的线程不需要排队
- 如果 state 不为 0,代表之前已有线程占有了锁,如果此时的线程依然是之前占有锁的线程(current == getExclusiveOwnerThread() 为 true),代表此线程再一次占有了锁(可重入锁),此时更新 state,记录下锁被占有的次数(锁的重入次数),这里的 setState 方法不需要使用 CAS 更新,因为此时的锁就是当前线程占有的,其他线程没有机会进入这段代码执行。所以此时更新 state 是线程安全的。
当然,因为之前锁已经被线程A占领了,所以这时候tryAcquire
会返回false,继续下面的流程。
addWaiter(Node.EXCLUSIVE)
当tryAcquire方法获取锁失败以后,则会先调用addWaiter将当前线程封装成Node,然后添加到AQS队列
private Node addWaiter(Node mode) {
// 将当前线程封装成Node,并且mode为独占锁
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
// tail不为空的情况,说明队列中存在节点数据
if (pred != null) {
// 将当前线程的Node的prev节点指向tail
node.prev = pred;
// 通过cas讲node添加到AQS队列
if (compareAndSetTail(pred, node)) {
// cas成功,把旧的tail的next指针指向新的tail
pred.next = node;
return node;
}
}
// tail=null,将node添加到同步队列中
enq(node);
return node;
}
这段代码首先会创建一个和当前线程绑定的Node
节点,Node
为双向链表。此时等待队列中的tail
指针为空,直接调用enq(node)
方法将当前线程加入等待队列尾部,然后返回当前结点的前驱结点,
enq
enq就是通过自旋操作把当前节点加入到队列中
private Node enq(final Node node) {
// 自旋
for (;;) {
// 如果是第一次添加到队列,那么tail=null
Node t = tail;
if (t == null) { // Must initialize
// CAS的方式创建一个空的Node作为头结点
if (compareAndSetHead(new Node()))
// 此时队列中只一个头结点,所以tail也指向它
tail = head;
} else {
// 进行第二次循环时,tail不为null,进入else区域。将当前线程的Node结点的prev指向tail,然后使用CAS将tail指向Node
node.prev = t;
// t此时指向tail,所以可以CAS成功,将tail重新指向Node。此时t为更新前的tail的值,即指向空的头结点,t.next=node,就将头结点的后续结点指向Node,返回头结点
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
第一遍循环时,tail指针为空,初始化一个Node结点,并把head和tail结点都指向它,然后第二次循环进来之后,tail结点不为空了,就将当前的结点加入到tail结点后面,也就是这样:
如果此时有另一个线程C进来的话,发现锁已经被A拿走了,然后队列里已经有了线程B,那么线程C就只能乖乖排到线程B的后面去,
acquireQueued
通过tryAcquire()和addWaiter(),我们的线程还是没有拿到资源,并且还被排到了队列的尾部。此时为了拿到资源,能做的事无非两个:
1. 循环让线程再抢资源。但仔细一推敲就知道不合理,因为如果有多个线程都参与的话,你抢我也抢只会降低系统性能
2. 进入等待状态休息,直到其他线程彻底释放资源后唤醒自己,自己再拿到资源
毫无疑问,选择2更加靠谱,acquireQueued方法做的也是这样的处理:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
// 标记是否会被中断
boolean interrupted = false;
for (;;) {
// 获取prev节点,若为null即刻抛出NullPointException
final Node p = node.predecessor();
// 只有前驱为head才有资格进行锁的抢夺
if (p == head && tryAcquire(arg)) {
// 获取锁成功后就不需要再进行同步操作了,获取锁成功的线程作为新的head节点
// 凡是head节点,head.thread与head.prev永远为null, 但是head.next不为null
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 如果获取锁失败,则根据节点的waitStatus决定是否需要挂起线程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()) // 则执行挂起,待下次唤醒的时候检测中断的标志
interrupted = true;
}
} finally {
// 如果抛出异常则取消锁的获取,则将此线程对应的node的waitStatus改为CANCEL,进行出队(sync queue)操作
if (failed)
// 稍后详细分析
cancelAcquire(node);
}
}
shouldParkAfterFailedAcquire
从上面的分析可以看出,只有队列的第二个节点可以有机会争用锁,如果成功获取锁,则此节点晋升为头节点。对于第三个及以后的节点,if (p == head)
条件不成立,首先进行shouldParkAfterFailedAcquire(p, node)
操作 。
shouldParkAfterFailedAcquire
方法是判断一个争用锁的线程是否应该被阻塞。它首先判断一个节点的前置节点的状态是否为Node.SIGNAL,如果是,是说明此节点已经将状态设置-“如果锁释放,则应当通知它”,所以它可以安全的阻塞了,返回true。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* 如果是SIGNAL状态,意味着当前线程需要被unpark唤醒
*/
return true;
// 如果前节点的状态大于0,即为CANCELLED状态时,则会从前节点开始逐步循环找到一个没有被“CANCELLED”节点设置为当前节点的前节点,返回false。在下次循环执行shouldParkAfterFailedAcquire时,返回true。这个操作实际是把队列中CANCELLED的节点剔除掉。
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 如果前继节点为“0”或者“共享锁”状态,则设置前继节点为SIGNAL状态。
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
acquireQueued
方法的流程是这样的:
-
CAS自旋,先判断当前传入的Node的前结点是否为head结点,是的话就尝试获取锁,获取锁成功的话就把当前结点置为head,之前的head置为null(方便GC),然后返回
-
如果前驱结点不是head或者加锁失败的话,就调用
shouldParkAfterFailedAcquire
,将前驱节点的waitStatus变为了SIGNAL=-1,最后执行parkAndChecknIterrupt
方法,调用LockSupport.park()
挂起当前线程,parkAndCheckInterrupt
在挂起线程后会判断线程是否被中断,如果被中断的话,就会重新跑acquireQueued
方法的CAS自旋操作,直到获取资源。
LockSupport.park方法会让当前线程进入waitting状态,在这种状态下,线程被唤醒的情况有两种,一是被unpark(),二是被interrupt(),所以,如果是第二种情况的话,需要返回被中断的标志,然后在
acquire
顶层方法的窗口那里自我中断补上
parkAndCheckInterrupt
如果shouldParkAfterFailedAcquire返回了true,则会执行:parkAndCheckInterrupt()
方法,它是通过LockSupport.park(this)将当前线程挂起到WATING状态,它需要等待一个中断、unpark方法来唤醒它,通过这样一种FIFO的机制的等待,来实现了Lock的操作。
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
LockSupport LockSupport类是Java6引入的一个类,提供了基本的线程同步原语。LockSupport实际上是调用了Unsafe类里的函数,归结到Unsafe里,只有两个函数:
public native void unpark(Thread jthread); public native void park(boolean isAbsolute, long time);
unpark函数为线程提供“许可(permit)”,线程调用park函数则等待“许可”。这个有点像信号量,但是这个“许可”是不能叠加的,“许可”是一次性的。 permit相当于0/1的开关,默认是0,调用一次unpark就加1变成了1.调用一次park会消费permit,又会变成0。 如果再调用一次park会阻塞,因为permit已经是0了。直到permit变成1.这时调用unpark会把permit设置为1.每个线程都有一个相关的permit,permit最多只有一个,重复调用unpark不会累积
此时,因为线程A还未释放锁,所以线程B状态都是被挂起的,
cancelAcquire
最后看一下cancelAcquire 方法,如果线程自旋中因为异常等原因获取锁最终失败,则会调用此方法。
private void cancelAcquire(Node node) {
// 将无效节点过滤
if (node == null)
return;
// 设置该节点不关联任何线程,也就是虚节点(由于线程要被取消了,所以将 thread 线程清掉)
node.thread = null;
// 通过前驱节点,跳过取消状态的node
// 将 node 的 pre 指向之前第一个非取消状态的结点(即跳过所有取消状态的结点),waitStatus > 0 表示当前结点状态为取消状态
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// 获取过滤后的前驱节点的后继节点
Node predNext = pred.next;
// 把当前node的状态设置为CANCELLED
node.waitStatus = Node.CANCELLED;
// 如果当前节点是尾节点,将从后往前的第一个非取消状态的节点设置为尾节点
// 更新失败的话,则进入else,如果更新成功,将tail的后继节点设置为null
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
// 如果当前节点取消了,就要把当前节点的前驱节点指向当前节点的后继节点,
// 但是我们之前也说了,要唤醒或阻塞结点,须在其前驱节点的状态为 SIGNAL 的条件才能操作,
// 所以在设置 pre 的 next 节点时要保证 pre 结点的状态为 SIGNAL
// 如果当前节点不是head的后继节点,1:判断当前节点前驱节点的是否为SIGNAL,2:如果不是,则把前驱节点设置为SINGAL看是否成功
// 如果1和2中有一个为true,再判断当前节点的线程是否为null
// 如果上述条件都满足,把当前节点的前驱节点的后继指针指向当前节点的后继节点
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
// 如果当前节点是head的后继节点,或者上述条件不满足,那就唤醒当前节点的后继节点
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
当前的流程:
- 获取当前节点的前驱节点,如果前驱节点的状态是CANCELLED,那就一直往前遍历,找到第一个waitStatus <= 0的节点,将找到的Pred节点和当前Node关联,将当前Node设置为CANCELLED。
-
根据当前节点的位置,考虑以下三种情况:
-
- 当前节点是尾节点。
- 当前节点是Head的后继节点。
- 当前节点不是Head的后继节点,也不是尾节点。
1、首先第一步当前节点之前有取消结点时,则逻辑如下
2、如果当前结点既非头结点的后继结点,也非尾结点,即步骤 1 所示,则最终结果如下
shouldParkAfterFailedAcquire方法会将 node 的 pre 指向之前 waitStatus 为非 CANCEL 的节点,所以当 T4 执行这段代码时,会变成如下情况
可以看到此时中间的两个 CANCEL 节点不可达了,会被 GC
3、如果当前结点为 tail 结点,则结果如下,这种情况下当前结点不可达,会被 GC
4、如果当前结点为 head 的后继结点时,如下
结果中的 CANCEL 结点同样会在 tail 结点自旋调用 shouldParkAfterFailedAcquire 后不可达,如下
通过上面的流程,我们对于CANCELLED节点状态的产生和变化已经有了大致的了解,但是为什么所有的变化都是对Next指针进行了操作,而没有对Prev指针进行操作呢?什么情况下会对Prev指针进行操作?
- 执行cancelAcquire的时候,当前节点的前置节点可能已经从队列中出去了(已经执行过Try代码块中的shouldParkAfterFailedAcquire方法了),如果此时修改Prev指针,有可能会导致Prev指向另一个已经移除队列的Node,因此这块变化Prev指针不安全。
- shouldParkAfterFailedAcquire方法中,会执行下面的代码,其实就是在处理Prev指针。shouldParkAfterFailedAcquire是获取锁失败的情况下才会执行,进入该方法后,说明共享资源已被获取,当前节点之前的节点都不会出现变化,因此这个时候变更Prev指针比较安全。
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
到这里,加锁的流程就分析完了,
3.2. ReentrantLock.unlock
加锁的过程分析完以后,再来分析一下释放锁的过程,
public void unlock() {
sync.release(1);
}
调用release方法,这个方法里面做两件事:
- 释放锁 ;
- 唤醒park的线程
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
这里的判断条件为什么是h != null && h.waitStatus != 0?
(1)h == null Head还没初始化。初始情况下,head == null,第一个节点入队,Head会被初始化一个虚拟节点。所以说,这里如果还没来得及入队,就会出现head == null 的情况。 (2)h != null && waitStatus == 0 表明后继节点对应的线程仍在运行中,不需要唤醒。 (3)h != null && waitStatus < 0 表明后继节点可能被阻塞了,需要唤醒。
tryRelease
这个动作可以认为就是一个设置锁状态的操作,而且是将状态减掉传入的参数值(参数是1),如果结果状态为0,就将排它锁的Owner设置为null,以使得其它的线程有机会进行执行。
在排它锁中,加锁的时候状态会增加1(当然可以自己修改这个值),在解锁的时候减掉1,同一个锁,在可以重入后,可能会被叠加为2、3、4这些值,只有unlock()的次数与lock()的次数对应才会将Owner线程设置为空,而且也只有这种情况下才会返回true。
protected final boolean tryRelease(int releases) {
// 这里是将锁的数量减
int c = getState() - releases;
// 如果释放的线程和获取锁的线程不是同一个,抛出非法监视器状态异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 由于重入的关系,不是每次释放锁c都等于0,直到最后一次释放锁时,才会把当前线程释放
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
此时AQS中的数据就会变成这样:
完全释放资源后,当前线程要做的就是唤醒CLH队列中第一个在等待资源的线程,也就是head结点后面的线程,此时调用的方法是unparkSuccessor()
,
unparkSuccessor
在方法unparkSuccessor(Node)中,就意味着真正要释放锁了,它传入的是head节点(head节点是占用锁的节点),当前线程被释放之后,需要唤醒下一个节点的线程
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
// 将head结点的状态置为0
compareAndSetWaitStatus(node, ws, 0);
/*
* 判断后继节点是否为空或者是否是取消状态
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
// 然后从队列尾部向前遍历找到最前面的一个waitStatus小于0的节点, 至于为什么从尾部开始向前遍历,因为在doAcquireInterruptibly.cancelAcquire方法的处理过程中只设置了next的变化,没有设置prev的变化,在最后有这样一行代码:node.next = node,如果这时执行了unparkSuccessor方法,并且向后遍历的话,就成了死循环了,所以这时只有prev是稳定的
// PS:没懂
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 内部首先会发生的动作是获取head节点的next节点,如果获取到的节点不为空,则直接通过:“LockSupport.unpark()”方法来释放对应的被挂起的线程,这样一来将会有一个节点唤醒后继续进入循环进一步尝试tryAcquire()方法来获取锁
if (s != null)
LockSupport.unpark(s.thread);
}
方法的逻辑很简单,就是先将head的结点状态置为0,避免下面找结点的时候再找到head,然后找到队列中最前面的有效结点,然后唤醒,我们假设这个时候线程A已经释放锁,那么此时队列中排最前边竞争锁的线程B就会被唤醒。然后被唤醒的线程B就会尝试用CAS获取锁,回到acquireQueued
方法的逻辑,
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
当线程B获取锁之后,会把当前结点赋值给head,然后原先的前驱结点 (也就是原来的head结点) 去掉引用链,方便回收,这样一来,线程B获取锁的整个过程就完成了,此时AQS的数据就会变成这样:
为什么要从后往前找第一个非Cancelled的节点呢?原因如下,通过addWaiter方法节点入队并不是原子操作,也就是说,node.prev = pred; compareAndSetTail(pred, node)
这两个地方可以看作Tail入队的原子操作,但是此时pred.next = node;
还没执行,如果这个时候执行了unparkSuccessor
方法,就没办法从前往后找了,所以需要从后往前找。还有一点原因,在产生CANCELLED状态节点的时候,先断开的是Next指针,Prev指针并未断开,因此也是必须要从后往前遍历才能够遍历完全部的Node。
综上所述,如果是从前往后找,由于极端情况下入队的非原子操作和CANCELLED节点产生过程中断开Next指针的操作,可能会导致无法遍历所有的节点。所以,唤醒对应的线程后,对应的线程就会继续往下执行。继续执行acquireQueued方法以后,中断如何处理?
4. 公平锁实现原理
公平锁在加锁的时候,会先判断AQS
等待队列中是存在节点,如果存在节点则会直接入队等待,具体代码如下.
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
这里会先判断state
值,如果不为0且获取锁的线程不是当前线程,直接返回false代表获取锁失败,被加入等待队列。如果是当前线程则可重入获取锁。
如果state=0
则代表此时没有线程持有锁,执行hasQueuedPredecessors()
判断AQS
等待队列中是否有元素存在,如果存在其他等待线程,那么自己也会加入到等待队列尾部,做到真正的先来后到,有序加锁。
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
先判断head
是否等于tail
,如果队列中只有一个Node
节点,那么head
会等于tail
,接着判断head
的后置节点,这里肯定会是null
,如果此Node
节点对应的线程和当前的线程是同一个线程,那么则会返回false
,代表没有等待节点或者等待节点就是当前线程创建的Node
节点。此时当前线程会尝试获取锁。
如果head
和tail
不相等,说明队列中有等待线程创建的节点,此时直接返回true
,如果只有一个节点,而此节点的线程和当前线程不一致,也会返回true
5. Condition实现原理
Condition`是在`java 1.5`中才出现的,它用来替代传统的`Object`的`wait()`、`notify()`实现线程间的协作,相比使用`Object`的`wait()`、`notify()`,使用`Condition`中的`await()`、`signal()`这种方式实现线程间协作更加安全和高效。因此通常来说比较推荐使用`Condition
其中AbstractQueueSynchronizer
中实现了Condition
中的方法,主要对外提供awaite(Object.wait())
和signal(Object.notify())
调用。
public class ConditionDemo {
public static void main(String[] args) {
ReentrantLock lock = new ReentrantLock();
Condition condition = lock.newCondition();
Thread tA = new Thread(() -> {
lock.lock();
try {
System.out.println("线程A加锁成功");
System.out.println("线程A执行await被挂起");
condition.await();
System.out.println("线程A被唤醒成功");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
System.out.println("线程A释放锁成功");
}
});
Thread tB = new Thread(() -> {
lock.lock();
try {
System.out.println("线程B加锁成功");
condition.signal();
System.out.println("线程B唤醒线程A");
} finally {
lock.unlock();
System.out.println("线程B释放锁成功");
}
});
tA.start();
tB.start();
}
}
执行main函数后结果输出为:
线程A加锁成功
线程A执行await被挂起
线程B加锁成功
线程B唤醒线程A
线程B释放锁成功
线程A被唤醒成功
线程A释放锁成功
代码执行的结果很容易理解,线程A先获取锁,然后调用await()
方法挂起当前线程并释放锁,线程B这时候拿到锁,然后调用signal
唤醒线程A。
毫无疑问,这两个方法让线程的状态发生了变化,我们仔细来研究一下
翻看AQS的源码,我们会发现Condition中定义了两个属性firstWaiter
和lastWaiter
,前面说了,AQS中包含了一个FIFO的CLH等待队列,每个Conditon对象就包含这样一个等待队列,而这两个属性分别表示的是等待队列中的首尾结点
public class ConditionObject implements Condition, java.io.Serializable {
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
}
注意:Condition当中的等待队列和AQS主体的同步等待队列是分开的,两个队列虽然结构体相同,但是作用域是分开的
await
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 将当前线程加入到等待队列中
Node node = addConditionWaiter();
// 完全释放占有的资源,并返回资源数
int savedState = fullyRelease(node);
int interruptMode = 0;
// 循环判断当前结点是不是在Condition的队列中,是的话挂起
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
当一个线程调用Condition.await()方法,将会以当前线程构造结点,这个结点的waitStatus
赋值为Node.CONDITION,也就是-2,并将结点从尾部加入等待队列,然后尾部结点就会指向这个新增的结点,
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
我们依然用上面的demo来演示,此时,线程A获取锁并调用Condition.await()方法后,AQS内部的数据结构会变成这样:
在Condition队列中插入对应的结点后,线程A会释放所持有的资源,走到while循环那层逻辑,
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
isOnSyncQueue
方法的会判断当前的线程节点是不是在同步队列中,这个时候此结点还在Condition队列中,所以该方法返回false,这样的话循环会一直持续下去,线程被挂起,等待被唤醒,此时,线程A的流程暂时停止了。
当线程A调用await()
方法挂起的时候,线程B获取到了线程A释放的资源,然后执行signal()
方法:
signal
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
先判断当前线程是否为获取锁的线程,如果不是则直接抛出异常。接着调用doSignal()
方法来唤醒线程。
private void doSignal(Node first) {
// 循环,从队列一直往后找不为空的首结点
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
/*
* CAS循环,将结点的waitStatus改为0
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
* 此方法会把当前结点加入到等待队列中,并返回前驱结点
*/
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
从doSignal
的代码中可以看出,这时候程序寻找的是Condition等待队列中首结点firstWaiter的结点,此时该结点指向的是线程A的结点,所以之后的流程作用的都是线程A的结点。
这里分析下transferForSignal
方法,先通过CAS自旋将结点waitStatus改为0,然后就把结点放入到同步队列 (此队列不是Condition的等待队列) 中,然后再用CAS将同步队列中该结点的前驱结点waitStatus改为Node.SIGNAL,也就是-1,
回到await()
方法,当线程A的结点被加入同步队列中时,isOnSyncQueue()
会返回true,跳出循环,
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
接着执行acquireQueued()
方法,尝试重新获取锁,如果获取锁失败继续会被挂起,直到另外线程释放锁才被唤醒。
所以,当线程B释放完锁后,线程A被唤醒,继续尝试获取锁,至此流程结束。
对于这整个通信过程,我们可以画一张流程图展示下:
6. JUC中的应用场景
除了上边ReentrantLock的可重入性的应用,AQS作为并发编程的框架,为很多其他同步工具提供了良好的解决方案。下面列出了JUC中的几种同步工具,大体介绍一下AQS的应用场景:
同步工具 | 同步工具与AQS的关联 |
---|---|
ReentrantLock | 使用AQS保存锁重复持有的次数。当一个线程获取锁时,ReentrantLock记录当前获得锁的线程标识,用于检测是否重复获取,以及错误线程试图解锁操作时异常情况的处理。 |
ReentrantReadWriteLock | 使用AQS同步状态中的16位保存写锁持有的次数,剩下的16位用于保存读锁的持有次数。 |
Semaphore | 使用AQS同步状态来保存信号量的当前计数。tryRelease会增加计数,acquireShared会减少计数。 |
CountDownLatch | 使用AQS同步状态来表示计数。计数为0时,所有的Acquire操作(CountDownLatch的await方法)才可以通过。 |
ThreadPoolExecutor | Worker利用AQS同步状态实现对独占线程变量的设置(tryAcquire和tryRelease)。 |
6.1. ReentrantReadWriteLock
static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
从代码中获取读写状态可以看出其是把state(int32位)字段分成高16位与低16位,其中高16位表示读锁个数,低16位表示写锁个数,如下图所示
6.2. 自定义同步工具
了解AQS基本原理以后,按照上面所说的AQS知识点,自己实现一个同步工具
public class LeeLock {
private static class Sync extends AbstractQueuedSynchronizer {
@Override
protected boolean tryAcquire (int arg) {
return compareAndSetState(0, 1);
}
@Override
protected boolean tryRelease (int arg) {
setState(0);
return true;
}
@Override
protected boolean isHeldExclusively () {
return getState() == 1;
}
}
private Sync sync = new Sync();
public void lock () {
sync.acquire(1);
}
public void unlock () {
sync.release(1);
}
}
public class LeeMain {
static int count = 0;
static LeeLock leeLock = new LeeLock();
public static void main (String[] args) throws InterruptedException {
Runnable runnable = new Runnable() {
@Override
public void run () {
try {
leeLock.lock();
for (int i = 0; i < 10000; i++) {
count++;
}
} catch (Exception e) {
e.printStackTrace();
} finally {
leeLock.unlock();
}
}
};
Thread thread1 = new Thread(runnable);
Thread thread2 = new Thread(runnable);
thread1.start();
thread2.start();
thread1.join();
thread2.join();
System.out.println(count);
}
}
上述代码每次运行结果都会是20000。通过简单的几行代码就能实现同步功能,这就是AQS的强大之处。
7. 参考资料
https://segmentfault.com/a/1190000017372067
https://mp.weixin.qq.com/s/sA01gxC4EbgypCsQt5pVog
https://mp.weixin.qq.com/s/y_e3ciU-hiqlb5vseuOFyw
https://mp.weixin.qq.com/s/tMI6qV_ItuTqlKZiUnAlmg