Semaphore源码解读

Semaphore源码解读

目录

Semaphore源码解读

前言

源码解读


前言 

Semaphore字面意思是信号量的意思,它的作用是控制访问特定资源的线程数目,底层依赖AQS的状态State,是在生产当中比较常用的一个工具类。构造方法:

public Semaphore(int permits)
       public Semaphore(int permits, boolean fair)

permits 表示许可线程的数量fair 表示公平性,如果这个设为 true 的话,下次执行的线程会是等待最久的线程 。

主要方法:

 public void acquire() throws InterruptedException
 public void release()
 tryAcquire(long timeout, TimeUnit unit)

acquire() 表示阻塞并获取许可,release() 表示释放许可,tryAcquire尝试获取,不会阻塞,超时后返回。具体的使用方法非常简单,就不细讲了,下面直接进入源码。


源码解读

和其他juc中的组件一样,Semaphore内部同样维护了一个Sync继承AbstractQueuedSynchronizer作为同步器的实现,下面看获得许可acquire():

    public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

会调用AbstractQueuedSynchronizer.acquireSharedInterruptibly(1):

    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();  //如果线程由中断唤醒,直接抛出中断异常
        if (tryAcquireShared(arg) < 0)  //尝试获取许可,如果返回值不小于0.则说明获取成功
            doAcquireSharedInterruptibly(arg);
    }

进入tryAcquireShared(arg),这个方法有不同的实现,这里看非公平版本,tryAcquireShared(int acquires)--->Semaphore.Sync.nonfairTryAcquireShared(int acquires):

        final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
                int available = getState();  //获取state资源状态
                int remaining = available - acquires; //计算剩余资源

                /**
                *如果剩余资源小于0,说明无剩余资源,直接返回。否则cas修改资源状态,只要还有剩余资源,则会无限自旋,不断获取
                */
                if (remaining < 0 || compareAndSetState(available, remaining))
                    return remaining;
            }
        }

如果上面的方法返回值小于0,则说明无剩余资源,进入AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(arg),这个方法的代码和Reentranlock中acquireQueued()大同小异(传送门:Reentantlock源码解读_w7sss的博客-CSDN博客),下面就挑不一样的讲讲:

    private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED); //入队,同Reentranlock
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);  //如果前驱节点是头节点再尝试一次获取资源
                    if (r >= 0) {
                        setHeadAndPropagate(node, r); //成功获得资源,修改节点状态为propagate
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())  //这边线程会正式阻塞,被唤醒后如果判断是由中断唤醒的会抛出中断异常,这里和Reentranlock不一样,Reentranlock的lock方法不会响应中断
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

这里主要和Reentranlock不同的是Semaphore是共享锁,区别在这个方法:AbstractQueuedSynchronizer.setHeadAndPropagate(node, r):

    private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        setHead(node);
        /*
         * Try to signal next queued node if:
         *   Propagation was indicated by caller,
         *     or was recorded (as h.waitStatus either before
         *     or after setHead) by a previous operation
         *     (note: this uses sign-check of waitStatus because
         *      PROPAGATE status may transition to SIGNAL.)
         * and
         *   The next node is waiting in shared mode,
         *     or we don't know, because it appears null
         *
         * The conservatism in both of these checks may cause
         * unnecessary wake-ups, but only when there are multiple
         * racing acquires/releases, so most need signals now or soon
         * anyway.
         */
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }

这个方法的主要逻辑在if中,这里

if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0)

 同时出现了一串判断,我把Doug Lea的原文注释贴上了,也说明这个if确实没那么好理解,我们暂且先看if中的逻辑,这要有助于理解这个if判断:

if (s == null || s.isShared())
                doReleaseShared();  //这里如果node节点的下一个节点是共享节点,就继续Release,即在接下来的代码有可能继续能唤醒后继节点,这在propagate > 0的时候是理所应当的

进入 doReleaseShared():

    private void doReleaseShared() {
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);  //如果头节点waitStatus是SIGNAL,则唤醒后继节点,这种情况是最好理解的,相信大家自己按正常流程跟代码(不用考虑并发),就能走到这里
                }
                /**
                * 如果头节点waitStatus是0,则cas修改为PROPAGATE,这里就要考虑并发了,在一个线程
                *正在获取资源的时候,又有一个线程释放了资源,也走到这个方法,执行unparkSuccessor(),
                *head的waitStatus被改为0,说明啥?说明即将又有资源被释放了,所以在这里把waitStatus改为PROPAGATE,
                *告诉执行到setHeadAndPropagate的线程,你可以进入doReleaseShared()了,因为我马上要把坑空出来了,这时如果有新线程入队,就可以有机会马上被唤醒。某种意义上这有自旋的含义
                */
                else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue; 
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }

讲到这里,那setHeadAndPropagate中的那个if判断就好理解了:

先看第一个h.waitStatus < 0,因为在doReleaseShared()中有线程释放资源把head的waitStatus改为PROPAGATE<0,表明有剩余资源了,所以这里可以判断进入doReleaseShared()操作直接唤醒可能马上就会入队的线程。

再看第二个h.waitStatus < 0,这时第一个h.waitStatus == 0,说明执行到第一个的时候还没有线程释放资源,但是到了第二个的时候h又被赋值成当前的head了,此时一般h.waitStatus ==0,但是如果此时又发生了在doReleaseShared()中有线程释放资源把head的waitStatus改为PROPAGATE<0的情况,那么也因该判断进入doReleaseShared()操作唤醒可能马上就会入队的线程。

当然了,这里是有可能导致不必要的唤醒的,因为h.waitStatus同样可能是-1,这里这么写的目的估计也是为了让资源尽可能地被充分利用。这段代码要结合多个线程并发的情况看。个人感觉关键还是要看

在doReleaseShared()中有线程释放资源把head的waitStatus改为PROPAGATE<0的情况

是发生在 setHead(node)之前还是之后,这个对应了两处h.waitStatus < 0的判断。

笔者讲的可能也不是太清楚,如有错误请指正。

好了acquire操作讲完了,下面是release操作:

    public void release() {
        sync.releaseShared(1);
    }


    进入

    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

我想这个就不用讲了,都是之前的代码。

再次申明:如有错误,欢迎指正!

上一篇:c-异步共享内存读/写


下一篇:java-ConcurrentHashMap中基于番石榴的信号量与信号量