Java并发_AQS源码分析中(8)

共享模式

共享模式下同步器则可以同时让多个线程占有,从获取同步器的入口方法 acquireShared 方法开始

1
2
3
4
5
6
7
8
9
//参数arg和独占模式同理
public final void acquireShared(int arg) {
  //tryAcquireShared失败返回负数
  //成功但没有剩余资源则返回0,即达到设置的允许同时占有同步器的最大线程数了
  //成功且还有剩余资源则返回正数,即后面的线程还可以继续获取
  if (tryAcquireShared(arg) < 0)
    //小于0失败进入队列等待
    doAcquireShared(arg);
}

doAcquireShared 方法和 acquireQueued 方法类似,并且复用了 addWaiter 方法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
private void doAcquireShared(int arg) {
  //复用了独占模式创建队列头结点的addWaiter方法
  final Node node = addWaiter(Node.SHARED);
  //标记是否成功获取到资源
  boolean failed = true;
  try {
    //标记等待过程中是否被中断过
    boolean interrupted = false;
    //不断循环
    for (;;) {
      final Node p = node.predecessor();
      //若当前结点的前驱结点是head
      if (p == head) {
        //则尝试获取资源,返回值小于0则表示获取失败
        //返回值等于0则表示还可以被其他线程占用,大于0则表示占有同步器的线程数达到最大
        int r = tryAcquireShared(arg);
        //若获取成功
        if (r >= 0) {
          //内部调用了独占模式介绍的setHead方法来设置头结点,并且添加了唤醒传播的逻辑
          setHeadAndPropagate(node, r);
          //将之前的头结点与队列断开
          p.next = null; 
          if (interrupted)
            //若在等待过程中被中断过,则获取到资源后执行自我中断
            selfInterrupt();
          failed = false;
          return;
        }
      }
      //检查结点状态,原理同独占模式的acquireQueued方法
      if (shouldParkAfterFailedAcquire(p, node) &&
          parkAndCheckInterrupt())
        interrupted = true;
    }
  } finally {
    if (failed)
      cancelAcquire(node);
  }
}

共享模式与独占模式最大的不同在于设置头结点的 setHeadAndPropagate 方法,就是通过该方法实现了当前面的线程获取到同步器,就立刻唤醒其后面的线程

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
//参数propagate为tryAcquireShared方法的返回值可能大于0也可能等于0
private void setHeadAndPropagate(Node node, int propagate) {
  Node h = head;
  //复用独占模式的设置头结点的方法
  //setHead方法执行后指针head将被移动到node
  setHead(node);
  //这个地方被修改了很多次,后面会进行讨论
  if (propagate > 0 || h == null || h.waitStatus < 0 ||
      (h = head) == null || h.waitStatus < 0) {
    Node s = node.next;
    //若下一个结点为空或是共享模式的结点
    if (s == null || s.isShared())
      //则唤醒后继结点
      doReleaseShared();
  }
}

releaseShared 方法是共享模式下线程释放同步器的入口,doReleaseShared 方法会在 releaseShared 方法和 setHeadAndPropagate 方法两个地方调用

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
public final boolean releaseShared(int arg) {
  if (tryReleaseShared(arg)) {
    //当前线程执行完调用releaseShared尝试释放同步器
    //在释放完成之前要唤醒其后的线程,即doReleaseShared方法在这里调用一次
    //如果后继结点成功获取到同步器,则会调用setHeadAndPropagate设置新的头结点
    //并且setHeadAndPropagate方法中也调用了doReleaseShared方法,实现唤醒的传播
    doReleaseShared();
    return true;
  }
  return false;
}

对于独占模式,能调用 release 方法释放同步器,进而调用 unparkSuccessor 方法唤醒后继结点的是当前占有同步器的线程,该线程曾经在队列的第二个结点,被唤醒后线程出队,自身结点被作为新的头结点,所以可以理解为是头结点唤醒的后继结点

对于共享模式,同样也是由头结点唤醒后面的结点,但可以同时有多个线程占有同步器,即这些线程对应的结点曾经都被当作过头结点,或者现在正被当作头结点,若第一个出队的线程调用 releaseShared 方法释放同步器,可能在其执行的那段时间头结点已经被改变了,所以查看共享模式的唤醒方法重点就是理解头结点的变化

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private void doReleaseShared() {
  for (;;) {
    //每次都获取最新的头结点
    Node h = head;
    if (h != null && h != tail) {
      int ws = h.waitStatus;
      //因为复用了独占模式的同步队列,所以结点状态为SIGNAL
      if (ws == Node.SIGNAL) {
        //可能会有多个线程同时调用doReleaseShared方法,所以一定要有这个CAS操作
        if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
          continue;       
        //唤醒头结点的后继结点并将头节点的状态置为0
        //也是复用的之前独占模式介绍的unparkSuccessor方法
        unparkSuccessor(h);
      }
      //如果头结点的状态为0,则需要将其设置为PROPAGATE,来保证唤醒的传播
      else if (ws == 0 &&
               !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
        continue;         
    }
    //若当前头结点被改变了就再执行一次for循环
    //即当前结点完成唤醒后继结点的任务将要退出时,发现被唤醒的后继结点已经被当作头结点了
    //就再执行一次唤醒头结点的后继结点的操作,循环开头重新获取最新的头结点
    if (h == head)                
      break;
  }
}

PROPAGATE

假设队列为 head -> A -> B -> C,结点 A 在获取到同步器之后将被置为 head,即队列变为 head(A) -> B -> C,在更新 A 为 head 的同时会调用 doReleaseShared 方法将 B 也唤醒,将这个操作记为 A-doReleaseShared

B 被唤醒后如果也获取到同步器,则将 B 更新为头结点,此时队列为 A -> head(B) -> C,若这时 A-doReleaseShared 还没有结束,则会发现头结点变了,所以要重新执行 for 循环并进入 else if,因为 A 结点的状态在上一次 for 循环时被置为了 0,这次循环会将 A 结点的状态置为 PROPAGATE ,即表示 head 结点已经向后传递了,所以需要 continue 重新获取新的头结点,再执行唤醒头结点后继结点的操作

在 A-doReleaseShared 执行第二次循环时,B 也会调用 doReleaseShared 方法唤醒 C,C 被唤醒后如果也获取到同步器,则头结点将被更新为 head(C) 并再次调用 doReleaseShared ……

即让之前的结点一起来帮忙唤醒当前最新的头结点的后继结点,提高唤醒效率

但如果头结点还没有被更新为 B 而 A-doReleaseShared 已经执行完退出了,则只能等 B 变为头结点后调用 doReleaseShared 方法来唤醒结点 C


直到我偶然看到 活在夢裡的博客 ,才知道其实在早期的 AQS 实现中是没有 PROPAGATE 这个状态的,在 Doug Lea 的 JSR 166 repository 中可以看到在 Revision1.74 版本才引入了 PROPAGATE ,目的是为了修复 bug 6801020 ,可以在 DougLea 的主页 查看到各个版本 AQS 的代码

下图是 1.73 到 1.74 setHeadAndPropagate 方法的 diff

1.73Diff1.74
1.73Diff1.74

bug 6801020 是在调用 release 时,waitStatus 可能为 0,因为在调用 setHeadAndPropagate 之前,上一个线程可能正在 doAcquireShared 中尝试获取同步器,给出的 bug 复现代码如下

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class TestSemaphore {
  //0个信号量
  private static Semaphore sem = new Semaphore(0);
  private static class Thread1 extends Thread {
    @Override
    public void run() {
      sem.acquireUninterruptibly();
    }
  }
  private static class Thread2 extends Thread {
    @Override
    public void run() {
      sem.release();
    }
  }
  public static void main(String[] args) throws InterruptedException {
    for (int i = 0; i < 10000000; i++) {
      //获取信号量,由于信号量为0个,所以阻塞
      Thread t1 = new Thread1(); Thread t2 = new Thread1();
      //释放信号量
      Thread t3 = new Thread2(); Thread t4 = new Thread2();
      t1.start(); t2.start(); t3.start(); t4.start();
      //等所有的线程都执行完
      t1.join(); t2.join(); t3.join(); t4.join();
      System.out.println(i);
    }
  }
}

t3 调用 release 方法,即调用 releaseShared 方法,进而调用 unparkSuccessor 方法,t1 被唤醒,并且此时 head 的 waitStatus 被置为 0 了

t1 被唤醒后调用 tryAcquireShared 返回 -1 (因为信号量设置为 0)

然后 t4 调用 releaseShared 方法,但此时 head 的 waitStatus 为 0,所以不会调用unparkSuccessor 方法

t1 获取信号量失败,调用 doAcquireShared 方法进入队列等待,进而调用 setHeadAndPropagate 方法,因为不满足 propagate > 0,所以不会调用 unparkSuccessor 方法唤醒后继节点

导致最后 t2 都永远不会被唤醒, t1 在 doAcquireShared 方法中死循环


1.74 引入 PROPAGATE 之后,并且 setHeadAndPropagate 也不是直接调用 unparkSuccessor 方法,而是调用新增加了 doReleaseShared 方法

现在再来分析上面的代码,当 t4 调用 releaseShared 方法看到 head 的 waitStatus 为 0 时,将置为 PROPAGATE 的 -3,之后 t1 再调用 setHeadAndPropagate 就满足条件 h.waitStatus < 0 进入 if 调用 doReleaseShared 方法将 t2 唤醒了

但是目前 Java 8 AQS 的 setHeadAndPropagate 方法与 1.74 的中的实现好像又有一些不同 ……


以上内容是玉山整理的笔记,如有错误还请指出