# 1.Semaphore介绍
- Semaphore翻译过来就是信号量的意思,用来限制能同时访问共享资源的线程上限。
- 简单举个例子:一个停车场里面只有三个停车位。这个时候同时来了俩辆车占了这个停车位。过了一会儿又来了一辆车。它发现还有一个停车位所以新来的这个车可以正常停进去。又过了一会儿又来了一辆车。但这个时候停车场已经满了。所以它只能在外面等待。等待有空车位了再进去。如果期间有一辆车走了。那么最后来的这个车就可以进去了
- 使用Semaphore限流,在访问高峰期时,让请求线程阻塞,高峰期过去再释放许可,当然它只适合限制单机线程数,并且仅是限制线程数。而不是限制线程资源树。
# 2.Semaphore简单示例
@Slf4j(topic = "c")
public class TestSemaphore {
public static void main(String[] args) {
Semaphore semaphore=new Semaphore(3);
for (int i=0;i<10;i++){
int j=i;
new Thread(()->{
try {
//获取许可
semaphore.acquire();
//模拟程序执行
log.debug("runngin.........");
Thread.sleep(2000);
log.debug("end...............");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
semaphore.release();
}
},"Thread-"+j).start();
}
}
}
通过程序运行结果我们可以看到最多同时只能有三个线程同时运行。
# 3.Semaphore加锁解锁流程
假设信号量为3。有五个线程同时访问。
假设其中 Thread-1,Thread-2,Thread-4 cas 竞争成功,而 Thread-0 和 Thread-3 竞争失败,进入 AQS 队列park 阻塞
这时 Thread-4 释放了 permits,状态如下
接下来 Thread-0 竞争成功,permits 再次设置为 0,设置自己为 head 节点,断开原来的 head 节点,unpark 接下来的 Thread-3 节点,但由于 permits 是 0,因此 Thread-3 在尝试不成功后再次进入 park 状态
# 3.1 源码角度加锁流程
初始化通过源码可以知道调用的是AQS非公平锁。state的值也就是我们传入的信号量
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
acquire加锁
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
先调用tryAcquireShared方法
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
//源码其实也就是调用的非公平锁的nonfairTryAcquireShared
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
//先获取state的值。如果state的值小于0代表当前没有可用的信号量了。tryAcquireShared直接返回-1.进入doAcquireSharedInterruptibly(arg)-其实就是需要排队阻塞了;
//如果state的值不小于0.代表还有可用的信号量。通过cas的手段修改state的值。直接返回剩余的state的值。
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
//建立节点准备进入同步队列(建立的是一个共享节点)--如果这块有不懂得建议大家看一下我的AQS的讲解。
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
//如果当前节点前一个节点。如果前一个节点是头节点。尝试获取锁。如果如果锁成功了。将当前头节点设置成自己。原头节点置为空。被回收掉
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
//如果上面没有获取锁成功。进入shouldParkAfterFailedAcquire。这个方法其实就是将当前节点的前一个节点设置为-1。就是待唤醒状态。如果已经是-1了就返回true。执行parkAndCheckInterrupt方法。挂起。下一次执行时就从这执行。
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
release解锁-还是调用的nonfair的releaseShared方法
public final boolean releaseShared(int arg) {
//先尝试计数器+1,简单理解就是释放一个锁。如果成功进入doReleaseShared
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
protected final boolean tryReleaseShared(int releases) {
for (;;) {
//获取state的值。next就是我们获取的值加上释放的(一般是+1).如果next的值小于current的值。那肯定就错误了呀。如果正常的话就cas修改state的值
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
//释放锁流程其实很简单。就去找同步队列里面的第一个节点。看一下它的值。如果waitstate的值=Node.SIGNAL就去修改头节点的waitstate=0.为了释放用。然后去调用unparkSuccessor方法。这个方法主要含义就是通过unpack方法去唤醒下一个节点。然后从刚刚阻塞的地方去运行。
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}