# 1.Semaphore介绍

  • Semaphore翻译过来就是信号量的意思,用来限制能同时访问共享资源的线程上限。
  • 简单举个例子:一个停车场里面只有三个停车位。这个时候同时来了俩辆车占了这个停车位。过了一会儿又来了一辆车。它发现还有一个停车位所以新来的这个车可以正常停进去。又过了一会儿又来了一辆车。但这个时候停车场已经满了。所以它只能在外面等待。等待有空车位了再进去。如果期间有一辆车走了。那么最后来的这个车就可以进去了
  • 使用Semaphore限流,在访问高峰期时,让请求线程阻塞,高峰期过去再释放许可,当然它只适合限制单机线程数,并且仅是限制线程数。而不是限制线程资源树。

# 2.Semaphore简单示例

@Slf4j(topic = "c")
public class TestSemaphore {
    public static void main(String[] args) {
        Semaphore semaphore=new Semaphore(3);
        for (int i=0;i<10;i++){
            int j=i;
            new Thread(()->{
                try {
                    //获取许可
                    semaphore.acquire();
                    //模拟程序执行
                    log.debug("runngin.........");
                    Thread.sleep(2000);
                    log.debug("end...............");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    semaphore.release();
                }
            },"Thread-"+j).start();
        }
    }
}

image.png 通过程序运行结果我们可以看到最多同时只能有三个线程同时运行。

# 3.Semaphore加锁解锁流程

假设信号量为3。有五个线程同时访问。

image.png 假设其中 Thread-1,Thread-2,Thread-4 cas 竞争成功,而 Thread-0 和 Thread-3 竞争失败,进入 AQS 队列park 阻塞 image.png 这时 Thread-4 释放了 permits,状态如下

image.png 接下来 Thread-0 竞争成功,permits 再次设置为 0,设置自己为 head 节点,断开原来的 head 节点,unpark 接下来的 Thread-3 节点,但由于 permits 是 0,因此 Thread-3 在尝试不成功后再次进入 park 状态 image.png

# 3.1 源码角度加锁流程

初始化通过源码可以知道调用的是AQS非公平锁。state的值也就是我们传入的信号量

public Semaphore(int permits) {
    sync = new NonfairSync(permits);
}

acquire加锁

public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
        先调用tryAcquireShared方法
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}
//源码其实也就是调用的非公平锁的nonfairTryAcquireShared
protected int tryAcquireShared(int acquires) {
    return nonfairTryAcquireShared(acquires);
}
final int nonfairTryAcquireShared(int acquires) {
    for (;;) {
    //先获取state的值。如果state的值小于0代表当前没有可用的信号量了。tryAcquireShared直接返回-1.进入doAcquireSharedInterruptibly(arg)-其实就是需要排队阻塞了;
    //如果state的值不小于0.代表还有可用的信号量。通过cas的手段修改state的值。直接返回剩余的state的值。
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}
private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    //建立节点准备进入同步队列(建立的是一个共享节点)--如果这块有不懂得建议大家看一下我的AQS的讲解。
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
        //如果当前节点前一个节点。如果前一个节点是头节点。尝试获取锁。如果如果锁成功了。将当前头节点设置成自己。原头节点置为空。被回收掉
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            //如果上面没有获取锁成功。进入shouldParkAfterFailedAcquire。这个方法其实就是将当前节点的前一个节点设置为-1。就是待唤醒状态。如果已经是-1了就返回true。执行parkAndCheckInterrupt方法。挂起。下一次执行时就从这执行。
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

release解锁-还是调用的nonfair的releaseShared方法

public final boolean releaseShared(int arg) {
//先尝试计数器+1,简单理解就是释放一个锁。如果成功进入doReleaseShared
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}
protected final boolean tryReleaseShared(int releases) {
    for (;;) {
    //获取state的值。next就是我们获取的值加上释放的(一般是+1).如果next的值小于current的值。那肯定就错误了呀。如果正常的话就cas修改state的值
        int current = getState();
        int next = current + releases;
        if (next < current) // overflow
            throw new Error("Maximum permit count exceeded");
        if (compareAndSetState(current, next))
            return true;
    }
}
private void doReleaseShared() {
    /*
     * Ensure that a release propagates, even if there are other
     * in-progress acquires/releases.  This proceeds in the usual
     * way of trying to unparkSuccessor of head if it needs
     * signal. But if it does not, status is set to PROPAGATE to
     * ensure that upon release, propagation continues.
     * Additionally, we must loop in case a new node is added
     * while we are doing this. Also, unlike other uses of
     * unparkSuccessor, we need to know if CAS to reset status
     * fails, if so rechecking.
     */
     //释放锁流程其实很简单。就去找同步队列里面的第一个节点。看一下它的值。如果waitstate的值=Node.SIGNAL就去修改头节点的waitstate=0.为了释放用。然后去调用unparkSuccessor方法。这个方法主要含义就是通过unpack方法去唤醒下一个节点。然后从刚刚阻塞的地方去运行。
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}
private void unparkSuccessor(Node node) {
    /*
     * If status is negative (i.e., possibly needing signal) try
     * to clear in anticipation of signalling.  It is OK if this
     * fails or if status is changed by waiting thread.
     */
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    /*
     * Thread to unpark is held in successor, which is normally
     * just the next node.  But if cancelled or apparently null,
     * traverse backwards from tail to find the actual
     * non-cancelled successor.
     */
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}