# 1.通过源码看一下AQS的主要的核心概念
* Provides a framework for implementing blocking locks and related
* synchronizers (semaphores, events, etc) that rely on
* first-in-first-out (FIFO) wait queues. This class is designed to
* be a useful basis for most kinds of synchronizers that rely on a
* single atomic {@code int} value to represent state. Subclasses
* must define the protected methods that change this state, and which
* define what that state means in terms of this object being acquired
* or released. Given these, the other methods in this class carry
* out all queuing and blocking mechanics. Subclasses can maintain
* other state fields, but only the atomically updated {@code int}
* value manipulated using methods {@link #getState}, {@link
* #setState} and {@link #compareAndSetState} is tracked with respect
* to synchronization.
AQS提供了一个框架来实现阻塞的锁以及相关的同步器,内部是基于一个FIFO队列。这个类是绝大多数同步器的基础,所有的同步器的内部都依赖于一个原子的volatile类型的state状态,子类必须定义一些受保护的方法,来去更改state状态,但AQS其实是不管这个state值在不同的子类中代表的意义是啥,你可以代表锁,也可以代表计数器,但是子类只能通过AQS提供的setState,getState,compareAndSetState这个三个方法来更改state的值,因为这个state是AQS用来跟踪同步器的状态的。比如锁是上锁还是没有锁上。AQS的其他方法来管理关于这个队列的进出和线程的阻塞。
* <p>This class supports either or both a default <em>exclusive</em>
* mode and a <em>shared</em> mode. When acquired in exclusive mode,
* attempted acquires by other threads cannot succeed. Shared mode
* acquires by multiple threads may (but need not) succeed. This class
* does not "understand" these differences except in the
* mechanical sense that when a shared mode acquire succeeds, the next
* waiting thread (if one exists) must also determine whether it can
* acquire as well. Threads waiting in the different modes share the
* same FIFO queue. Usually, implementation subclasses support only
* one of these modes, but both can come into play for example in a
* {@link ReadWriteLock}. Subclasses that support only exclusive or
* only shared modes need not define the methods supporting the unused mode.
AQS这个类支持独占模式和共享模式,默认情况下是独占模式,所谓的独占模式当某一个线程抢占锁以后其他的线程就获取不到这个对象的锁,共享模式就是当某一个线程抢占锁以后其他的线程来抢占时可能会成功,不同模式下等待的线程共享同一个FIFO队列。通常子类只支持其中的一种模式就可以,如果你只支持其中一种模式你就不用去实现另外一种模式锁需要的方法。你也可以俩种方法都支持。
* <p>This class defines a nested {@link ConditionObject} class that
* can be used as a {@link Condition} implementation by subclasses
* supporting exclusive mode for which method {@link
* #isHeldExclusively} reports whether synchronization is exclusively
* held with respect to the current thread, method {@link #release}
* invoked with the current {@link #getState} value fully releases
* this object, and {@link #acquire}, given this saved state value,
* eventually restores this object to its previous acquired state. No
* {@code AbstractQueuedSynchronizer} method otherwise creates such a
* condition, so if this constraint cannot be met, do not use it. The
* behavior of {@link ConditionObject} depends of course on the
* semantics of its synchronizer implementation.
AQS里面也定义了一个ConditionObject这个类,意思就是AQS也可以支持条件。
* <p>This class provides inspection, instrumentation, and monitoring
* methods for the internal queue, as well as similar methods for
* condition objects. These can be exported as desired into classes
* using an {@code AbstractQueuedSynchronizer} for their
* synchronization mechanics.
AQS也提供了一些方法给FIFO这个队列。例如检测、检查、和监控队列。
* <p>To use this class as the basis of a synchronizer, redefine the
* following methods, as applicable, by inspecting and/or modifying
* the synchronization state using {@link #getState}, {@link
* #setState} and/or {@link #compareAndSetState}:
*
* <ul>
* <li> {@link #tryAcquire}
* <li> {@link #tryRelease}
* <li> {@link #tryAcquireShared}
* <li> {@link #tryReleaseShared}
* <li> {@link #isHeldExclusively} //判断当前的线程是否已经获取到了同步状态。
* </ul>
*
* Each of these methods by default throws {@link
* UnsupportedOperationException}. Implementations of these methods
* must be internally thread-safe, and should in general be short and
* not block. Defining these methods is the <em>only</em> supported
* means of using this class. All other methods are declared
* {@code final} because they cannot be independently varied.
*
如果你要是使用这个类的话你就需要重写这几个方法。其它的方法不能被独自 的实现。其它的方法用来管理入队出队和阻塞的机制它自己会处理,我们要做的就是重写这几个方法。如果自定义的同步器支持共享和独占模式那么这几个类都需要重写,如果你只支持共享模式,你只需要重写tryAcquire和tryRelease。如果你支持独占模式你需要重写tryAcquireShared和tryAcquireShared方法
* <p>You may also find the inherited methods from {@link
* AbstractOwnableSynchronizer} useful to keep track of the thread
* owning an exclusive synchronizer. You are encouraged to use them
* -- this enables monitoring and diagnostic tools to assist users in
* determining which threads hold locks.
*
AQS推荐你使用AbstractOwnableSynchronizer这个类。这个类里面提供了setExclusiveOwnerThread来设置当前持有锁的线程和getExclusiveOwnerThread获取当前持有锁的线程。
* <p id="barging">Because checks in acquire are invoked before
* enqueuing, a newly acquiring thread may <em>barge</em> ahead of
* others that are blocked and queued. However, you can, if desired,
* define {@code tryAcquire} and/or {@code tryAcquireShared} to
* disable barging by internally invoking one or more of the inspection
* methods, thereby providing a <em>fair</em> FIFO acquisition order.
* In particular, most fair synchronizers can define {@code tryAcquire}
* to return {@code false} if {@link #hasQueuedPredecessors} (a method
* specifically designed to be used by fair synchronizers) returns
* {@code true}. Other variations are possible.
这段的主要意思就是针对于公平锁和非公平锁。在非公平锁的情况有的时候可能有正在FIFO队伍中排队的线程,这个时候如果上一个线程释放锁同时又有一个线程进来了,可能是新进来的的线程获取到锁,而正在排列队伍中的线程获取不到锁。但如果你想实现公平锁也就是绝对公平的话,你可以调用hasQueuedPredecessors()这个方法。这个方法主要作用就是去看一下FIFO队列中有没有正在排队的队列。如果有的话tryAcquire直接返回false。然后去队列中唤醒线程。
# 2.AQS基础原理总结
通过上面的源码部分我们已经对AQS有了大致的印象。总结一下吧。
- 概述:全称是AbstractQueuedSynchrnizer,是阻塞式锁和相关的同步器工具的框架。其实AQS就是一个抽象类。
- 特点:
- 用state属性来表示资源的状态(分独占模式和共享模式),子类需要定义如何维护这个状态,控制获取锁和释放锁
- getState-获取state状态
- setState-设置state状态
- compareAndSetState-cas机制设置state状态
- 独占模式是只有一个线程能够访问资源,而共享模式可以多个线程访问资源。不同模式可以通过实现AQS中不同的方法
- 用state属性来表示资源的状态(分独占模式和共享模式),子类需要定义如何维护这个状态,控制获取锁和释放锁
- 子类主要实现这样一些方法
- tryAcquire()
- tryRelease()
- tryAcquireShared()
- tryReleaseShared()
- isHeldExclusively() //判断当前的线程是否已经获取到了同步状态。
- AOS中提供了俩个方法
- setExclusiveOwnerThread 来设置当前持有锁的线程
- getExclusiveOwnerThread 来获取当前持有锁的线程
- 简单总结一下:其实AQS就是一个框架,它并不是平时我们写代码的时候我们使用的具体的类。但是我们平时使用的类有很多都是由它来实现的。ReentrantReadWriteLock。
# 3.自定义实现一个可重入的独占锁
public class MyLock {
private Sync sync;
class Sync extends AbstractQueuedSynchronizer{
@Override
protected boolean tryAcquire(int arg) {
//获取当前线程
final Thread thread = Thread.currentThread();
/**
* state含义
* 0 表示没有线程持有锁
* 1 表示有线程持有锁
*/
//获取当前state的值
int c=getState();
/**
* 如果c=0 代表当前没有线程持有锁 尝试加锁
* 如果不等于0,判断持有锁的线程是否和当前线程是同一个线程,如果是同一个线程则增加state的值
* 除此以为 则代表持有锁的线程和当前线程不是同一个线程,加锁失败。返回false;
*/
if (c==0){
// 通过cas 尝试 修改state状态
if (compareAndSetState(0,arg)){
//如果修改成功,设置当前对象持有锁的线程
setExclusiveOwnerThread(thread);
return true;
}
}else if (thread==getExclusiveOwnerThread()){
int nextc=c+arg;
setState(nextc);
return true;
}
return false;
}
@Override
protected boolean tryRelease(int arg) {
//获取当前state的值 因为是可重入锁。所以减去arg
int c=getState()-arg;
//获取当前线程不等于持有锁的线程-就是不是同一个锁就抛出异常;
if (Thread.currentThread()!=getExclusiveOwnerThread()){
throw new IllegalMonitorStateException();
}
boolean free =false;
//如果c==0的话则代表释放锁成功。并且设置当前对象持有锁的对象为null;
if (c==0){
free=true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
public void showLockThread(){
if (getExclusiveOwnerThread()!=null){
System.out.println("CurrentThread:"+getExclusiveOwnerThread().getName());
}
}
}
public MyLock() {
sync=new Sync();
}
public void lock(){
sync.acquire(1);
}
public void unlock(){
sync.release(1);
}
public void showLockThread(){
sync.showLockThread();
}
public static void main(String[] args) throws InterruptedException {
MyLock myLock=new MyLock();
Thread t1 = new Thread(() -> {
System.out.println("start t1-------");
try {
myLock.lock();
System.out.println("t1 get Lock------");
Thread.sleep(5000);
System.out.println("t1 return------");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
myLock.unlock();
}
}, "t1");
Thread t2 = new Thread(() -> {
System.out.println("start t2-------");
try {
myLock.lock();
System.out.println("t2 get Lock------");
Thread.sleep(5000);
System.out.println("t2 return------");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
myLock.unlock();
}
}, "t2");
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println("执行结束");
}
}
这里我们自定义一个独占模式的可重入锁,大家可以手动敲一下,来更方便理解AQS是怎么用的。
# 4.AQS加锁解锁流程
加锁:首先当我们创建一个同步器的时候(比如new Sync)它会首先会初始化当前锁的状态(state 默认初始化为0)和初始化当前持有锁的线程(默认为null)
这时候有三个线程同时来争取锁。我们假设t1线程拿到锁了 会将state=1并且当前持有锁的线程设置为t1.这个时候如果t2竞争锁时发现state!=0了。t2会初始化一个FIFO队列(双向列表),并且会建立一个头节点head(pre=null,state=0),并且在初始化一个节点(代表自己)node-t1(并且将自己的pre指向头节点,头节点的next指向自己,state=0),并且tail指向自己。这个时候t3发现锁也被人占了。队列也已经被创建了。这个时候它就会直接加入队列。中间有一个过程就是申请进入同步队列操作。当每一个节点进入队列以后如果发现自己的前一个节点是head节点的话他就会尝试几次获取锁。如果都获取失败了,才是真正加入这个队列。开启挂起状态,在真正开启挂起状态之前每个节点会将它的前一个节点的state设置为-1(-1的意思就是它的下一个节点需要等待被唤醒)
解锁:
- 首先检查当前线程是否和持有锁的线程是同一个线程。
- 设置当前持有锁的线程为null并且修改锁的state字段(state代表锁的状态)。
- 唤醒队列中头节点的下一个节点的线程。
- 1.检查自己的前驱节点是不是头节点如果是头节点的话就开始竞争锁。这块有个问题就是这个节点在竞争锁的时候恰巧同时又新进来了一个线程。可能锁会被这个新线程竞争到(这个是在非公平锁的情况下才会出现。如果是公平锁,新进来的线程是没有资格竞争锁的,只能去队列里面排队。如果队列里面的t2节点成功拿到了锁。这个节点会讲当先持有锁的信息设置成自己(state=1和持有锁的线程=t2),并且自己就变成了head节点。原来的head节点就被删除了
- 上述这种情况是独占模式下的加锁解锁流程。如果是共享模式的话最主要的区别就是在当第一个线程获取锁成功以后只修改state字段,不设置持有锁的线程。因为共享锁的锁是可以被多个线程同时拿到,这个还有看同步器的具体实现类是否允许,例如ReentrantReadWriteLock是允许的 而countdownlatcha是不允许的。共享模式下节点为shared。当其唤醒头节点唤醒一下节点以后也会唤醒下一个节点来竞争锁(就是将队列中的每个节点依次唤醒一下)。这也是和独占模式的区别。独占模式只会唤醒头节点的下一个节点。而共享模式会依次唤醒。
# 5.AQS的Condition详解
简单写一个condition例子看下它的执行结果
public class TestCondition {
public static void main(String[] args) throws InterruptedException {
ReentrantLock reentrantLock=new ReentrantLock();//可重入,AQS独占模式
Condition condition = reentrantLock.newCondition();
Thread t1 = new Thread(() -> {
reentrantLock.lock();
try {
System.out.println("t1 get lock and await....");
condition.await();//让出锁,等待被唤醒
System.out.println("t1 continue......");
Thread.sleep(2000);
System.out.println("t1 return.....");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
reentrantLock.unlock();
}
}, "t1");
Thread t2 = new Thread(() -> {
reentrantLock.lock();
try {
System.out.println("t2 get lock and await....");
Thread.sleep(2000);
condition.signal();//不阻塞
System.out.println("t2 return.....");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
reentrantLock.unlock();
}
}, "t2");
t1.start();
Thread.sleep(1000);
t2.start();
t1.join();
t2.join();
System.out.println(" 结束运行");
}
}
可以看到t1线程先获取锁,t2线程后获取锁所以t2进入阻塞状态,然后t1因为调用了await方法而暂停了并且释放了锁,这时候t2获取到了锁继续执行并且还调用了singal方法。t2结束运行以后,t1继续执行。
# 过程详解
首先t1调用了lock方法,加锁模式和我们上面讲的一样。然后t1调用了condition.await()方法。这个时候就进入了await操作。
- 首先创建条件节点(并且将state值=-2),必要的时候初始化条件队列或者把自己创建的条件节点加入到条件队列当中去(初不初始化条件队列主要就是看当前有没有别人已经初始化好了,如果有就正常加入,如果没有就自己初始化。
- 释放当前线程的锁,也是通过AQS的release方法(同时也因为这个时候t2在同步队列中,也去唤醒t2线程继续执行)。
- 将条件队列中的节点转移到同步队列中去,把刚刚加入到同步队列中的条件节点的前驱的waitState=-1;挂起当前线程
4. 然后t2继续执行,t2执行了singal方法,并且执行结束。进入到singal方法执行步骤。singal方法其实就是执行AQS的release方法,先释放锁,通过head节点找到后一个节点,然后唤醒节点的线程t1。
5. t1被唤醒后其实和上面我们讲的一样了,它会判断它的前一个节点是不是头节点,如果是的话尝试获取锁(不一定成功,因为在非公平锁情况下可能会出现新来的线程直接获取到锁的情况,如果被其他人获取了,则继续在队列中等待。)
# 6.AQS源码详解-以ReentrantLock为例-独占模式
ReentrantLock的lock
ReentrantLock是可重入独占模式的。ReentrantLock是主要有三个子类。Sync,NonfairSync,FairSync。作为ReentrantLock的帮助类,来帮助ReentrantLock实现锁的操作。FairSync和NonfairSync分别是公平锁的和非公平锁的实现,都是Sync的子类。
- 当我们创建一个ReentrantLock的时候其实创建的就是一个非公平的锁
2.当调用lock方法时其实就是调用的nonfairSync的lock方法
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
- 首先它会通过cas的(compareAndSetState)方法去修改state状态的值,首先去看state等不等于0,如果是0的话,则将state状态修改为1,然后将持有锁的线程设置为当前线程(setExclusiveOwnerThread(Thread.currentThread());)。从这里我们可以看着在ReentrantLock中state字段代表的是是否上锁,0-没有锁,1-有锁。
- 如果修改失败则执行AQS的acquire(1)方法
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
先调用NonfairSync的tryAcquire()方法
final boolean nonfairTryAcquire(int acquires) {
//获取当前线程
final Thread current = Thread.currentThread();
//获取state的值
int c = getState();
//如果当前state当前的值=0则代表当前没有锁,这个时候尝试修改state的值,修改成功后上锁,返回ture
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}//判断当前线程和持有锁的线程是否是同一个线程如果是的话state的值加1--可重入锁
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
//如果返回false 就代表上锁失败
return false;
}
这个上锁失败,tryAcquire返回false。取反等于true,继续执行acquireQueued(addWaiter(Node.EXCLUSIVE), arg),addWaiter创建节点入队列。acquireQueued申请入队列。
/**
* Creates and enqueues node for current thread and given mode.
*
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/
private Node addWaiter(Node mode) {
给当前线程创建一个节点,mode代表模式Node.EXCLUSIVE-独占模式
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
//执行入队列操作
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//这个方法也是执行入队列的操作,如果当前有队列可能会直接执行上面的方法入队,如果当前没有队列这个方法就会初始化一个队列并且完成入队操作。成功之后会返回头节点
enq(node);
return node;
}
addWaiter()执行完会返回当前节点
/**
* Acquires in exclusive uninterruptible mode for thread already in
* queue. Used by condition wait methods as well as acquire.
*
* @param node the node
* @param arg the acquire argument
* @return {@code true} if interrupted while waiting
*/
//将当前节点执行入队列操作。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//取当前节点的前驱节点,如果前驱节点是头节点的话执行tryAcquire().也就是我们前面提到的如果入队列的时候这个节点的前一个节点是头节点的话就会尝试一次获取锁,如果获取锁成功了。就直接拿到锁了。
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
//如果获取锁成功了,就直接设置当前节点为头节点。
setHead(node);
//将原头节点的next指针设置为null,就相当于不要原先的头节点了。删除原头节点,原头节点会直接GC,被回收掉了。
p.next = null; // help GC
failed = false;
return interrupted;
}
//如果获取锁失败了,或者当前节点的前一个节点不是头节点,就继续往下执行。下面说了如果shouldParkAfterFailedAcquire返回false的话,if里面的肯定不会执行,for (;;)还是一个死循环,会继续重新执行,那么当前线程就又有了一次机会去获取锁。如果还是失败又执行到这里进入shouldParkAfterFailedAcquire方法。这个时候肯定返回true了。因为在上一次执行这个方法的时候就将头节点的state的值设置成为-1了。然后执行parkAndCheckInterrupt 挂起当前线程。当一会儿执行AQS的release()操作的时候。就会从这继续执行。
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//获取到当前节点的前一个节点的state状态
int ws = pred.waitStatus;
//如果前一个节点的state状态==Node.SIGNAL(-1)则代表这个线程需要等待被唤醒。 并且直接返回true。然后去调用parkAndCheckInterrupt这个方法。
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
// 如果头节点不是-1(默认是为0的)就执行eles方法,将头节点的状态设置成Node.SIGNAL(-1)然后返回false;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
/**
* Convenience method to park and then check if interrupted
*
* @return {@code true} if interrupted
*/
//这个方法会直接把当前这个线程挂起。
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
# 6.2 ReentrantLock的unlock()方法
public void unlock() {
sync.release(1);
}
其实也是调用的非公平锁nonfair的release方法。
public final boolean release(int arg) {
//首先还是调用父类sync的tryRelease()
if (tryRelease(arg)) {
//如果tryRelease返回true的话。则进入if里面 拿到头节点,头节点不等于null 代表同步队列是存在的。如果头节点的state状态不等于0 就是1 就代表它需要调用unparkSuccessor去唤醒下一个线程。传入的是头节点,因为我们每次唤醒的都是头节点的下一个节点。
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
//拿到当前锁的state然后-1
int c = getState() - releases;
//判断当前线程持有锁的线程是不是同一个线程,如果不是同一个线程就直接抛出异常了。
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
//假设是同一个线程并且c==0的话,就直接讲当前持有锁的线程设置为null,并且设置state的值=0;然后返回free=true;代表正确释放锁了。
//为什么会有free这个字段,因为Reentrantlock是可重入锁。如果加锁了俩次 但只解锁了一次,则则代表当前线程还是持有这个锁的。就不会执行下面的if。直接返回false。代表释放锁失败了。
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
//获取当前节点也就是头节点的状态,如果头节点的状态小于0,则是-1
int ws = node.waitStatus;
if (ws < 0)
//就将头节点的状态改为0
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
//拿到头节点的下一个节点 如果不等于null的话,就直接执行uppark方法给它唤醒了,唤醒了以后继续从它当时被阻塞的地方继续执行。上文在acquire方法中有提到。
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
非公平锁:从上面的解释我们看到我们只是唤醒线程了,被唤醒的线程不是直接去获得锁还是需要去执行tryAcquire()方法。这就有可能出现一种情况,就是当同步队列里面的线程被唤醒的同时,又有一个新的线程进来了。这个新的线程可能在被唤醒线程执行tryAcquire方法之前先执行tryAcquire方法并且获得到锁。这其实也就是非公平锁的实现原理
# 6.3 看一下ReentrantLock是如何实现公平锁的
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {
//获取当前线程
final Thread current = Thread.currentThread();
// 获取state状态等于0的话 就代表当前没有线程持有锁。执行hasQueuedPredecessors,如果这个方法返回true的话就代表当前就不会执行if里面的代码块。同样也就是说新来的线程是获取不到锁的,需要去排队。
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
//这个方法的左右就是如果当前同步队列中有线程,返回true。没有的话返回false
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
公平锁和非公平锁最主要的区别就是在公平锁获取锁的时候有一个hasQueuedPredecessors的判断。如果这个队列中有队列的话就返回true。也就是它不会让同时间新来的线程获取锁。只会从队列里面取线程来获取锁。
# 7.AQS源码分析-以CountDownLatch为例-共享模式
上面我们介绍了独占模式下ReentrantLock的一些原理。接下来我们来分析下AQS在共享模式下的原理。在CountDownLatch中state字段是一个计数的概念。
# 7.1 await()方法。
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
它的本质还是调用sync的acquireSharedInterruptibly方法
/**
* Acquires in shared mode, aborting if interrupted. Implemented
* by first checking interrupt status, then invoking at least once
* {@link #tryAcquireShared}, returning on success. Otherwise the
* thread is queued, possibly repeatedly blocking and unblocking,
* invoking {@link #tryAcquireShared} until success or the thread
* is interrupted.
* @param arg the acquire argument.
* This value is conveyed to {@link #tryAcquireShared} but is
* otherwise uninterpreted and can represent anything
* you like.
* @throws InterruptedException if the current thread is interrupted
*/
//这个方法是可中断的。然后进入tryAcquireShared。这个是countDownLatch自己实现的方法。返回-1的话就代表state的值不等于0。就代表当前还可以给state的值减一个(简单理解就是还可以继续上锁)--然后进入到doAcquireSharedInterruptibly方法
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
//看一下当前state的值是不是等于0,不等于0返回-1.
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
//它这快其实也是创建节点然后入队的操作。但是它创建的节点和独占模式是不一样的。它创建的节点是Node.SHARED 是可共享的。
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
# 7.2 countDown()执行流程
/**
* Decrements the count of the latch, releasing all waiting threads if
* the count reaches zero.
*
* <p>If the current count is greater than zero then it is decremented.
* If the new count is zero then all waiting threads are re-enabled for
* thread scheduling purposes.
*
* <p>If the current count equals zero then nothing happens.
*/
public void countDown() {
sync.releaseShared(1);
}
他其实也是调用了sync的releaseShared方法
public final boolean releaseShared(int arg) {
//还是先去执行tryReleaseShared方法。
如果返回false的话就代表第一次countDown失败。可能还需要在来一次。就比如原先state的值是2.你countDown一次以后 state就变成1了。还需要在来一次。如果返回true就代表state=0了。就执行doReleaseShared方法
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
//首先还是获取state的值。如果不等于0,就减1.并且通过CAS的方式。更新state的值。再去比较state的值。如果=0 返回true代表释放成功。返回false代表释放失败。还需要在释放。
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
private void doReleaseShared() {
for (;;) {
//获取当前的头节点。如果头节点不等于空并且它的state的值=-1(ws == Node.SIGNAL)。就通过cas的操作将头节点的state的值设置为0.然后去执行unparkSuccessor。unparkSuccessor这个方法其实和我们之前讲的独占模式下的一样。
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
//将头节点的state的值设置为0.
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
//获取头节点的下一个节点。并且去唤醒。调用unpark方法。然后被唤醒的节点会在上次阻塞的地方继续执行。并且再次尝试获取锁--在共享模式下会执行一个叫setHeadAndPropagate的方法。流程和独占模式差不多。
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
private void setHeadAndPropagate(Node node, int propagate) {
//获取当前的头节点。
Node h = head; // Record old head for check below
//setHead就是将头节点设置为空。---也就是上文说的为了垃圾回收。
setHead(node);
/*
* Try to signal next queued node if:
* Propagation was indicated by caller,
* or was recorded (as h.waitStatus either before
* or after setHead) by a previous operation
* (note: this uses sign-check of waitStatus because
* PROPAGATE status may transition to SIGNAL.)
* and
* The next node is waiting in shared mode,
* or we don't know, because it appears null
*
* The conservatism in both of these checks may cause
* unnecessary wake-ups, but only when there are multiple
* racing acquires/releases, so most need signals now or soon
* anyway.
*/
//这个地方就是和独占模式不一样的是。它会获取同步队列中头节点的下一个节点。只要下一节点不为空的话就调用doReleaseShared方法。其实也是会去唤醒下一个节点的线程。直到共享队列为空。(和独占模式不同的是独占模式只会唤醒头节点的下一个节点。而共享模式只要接受到了release信号的话。它的共享队列中的所有节点都会接收到这个信号。告知其可以去竞争锁资源。)
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}