Java并发_AQS源码分析下(9)

条件队列

首先查看 Condition 接口,其实就是 await 和 signal 方法,只不过有响应中断和设置超时的不同版本

1
2
3
4
5
6
7
8
9
public interface Condition {
    void await() throws InterruptedException;
    void awaitUninterruptibly();
    long awaitNanos(long nanosTimeout) throws InterruptedException;
    boolean await(long time, TimeUnit unit) throws InterruptedException;
    boolean awaitUntil(Date deadline) throws InterruptedException;
    void signal();
    void signalAll();
}

Condition 功能主要是由定义在 AQS 内部的 ConditionObject 类实现的,并且条件队列是复用了独占和共享模式同步队列的结点,在 ConditionObject 类中定义了两个引用

1
2
3
4
public class ConditionObject implements Condition, java.io.Serializable {
  private transient Node firstWaiter;
  private transient Node lastWaiter;
}

查看将当前线程包装为 Node 结点并插入到条件队列中的 addConditionWaiter 方法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
private Node addConditionWaiter() {
  Node t = lastWaiter;
  //如果条件队列末尾的节点状态为CANCELLED,
  if (t != null && t.waitStatus != Node.CONDITION) {
    //则调用unlinkCancelledWaiters方法进行CANCELLED结点的清理
    //具体实现就不用重点关注了,就是遍历单链表删除一些结点
    unlinkCancelledWaiters();
    //更新末尾结点
    t = lastWaiter;
  }
  //包装当前线程为Node结点
  Node node = new Node(Thread.currentThread(), Node.CONDITION);
  //lastWaiter为null表示队空,则进行头结点的初始化
  if (t == null)
    firstWaiter = node;
  else
    //将队尾结点的nextWaiter连接到新入队的node
    t.nextWaiter = node;
  //再更新lastWaiter引用
  lastWaiter = node;
  return node;
}

创建 Node 对象调用的构造方法如下

1
2
3
4
5
static final int CONDITION = -2;
Node(Thread thread, int waitStatus) {
  this.waitStatus = waitStatus;
  this.thread = thread;
}

nextWaiter 对于同步队列是用来区分独占和共享的,而条件队列是让 nextWaiter 指向下一个结点,即将双向链表变为了单链表,大概形如下图

AQS条件队列
AQS条件队列

await 方法

分析一下支持中断但不支持超时的 await 方法,其他类型的 await 方法大致同理,关键是要分析清楚当前结点的状态以及所处的位置 (是在条件队列中还是已经被转移到同步队列中去了),并且要实现响应中断,就必须在每个状态变化之前都进行检查,因为在 signal 调用之前被中断和在 signal 调用之后被中断,处理方式是不同的

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public final void await() throws InterruptedException {
  //因为当前的await方法是响应中断的,在实际执行await的逻辑之前就要先判断是否被中断
  if (Thread.interrupted())
    //被中断直接抛异常,await的逻辑也不用执行了
    throw new InterruptedException();
  //将当前线程封装成Node结点并插入到条件队列
  Node node = addConditionWaiter();
  //将当前的state作为参数传入release方法释放当前线程占有的锁,并返回当前的state
  //对于可重入锁则会将state置0
  int savedState = fullyRelease(node);
  //用来记录如何处理中断
  int interruptMode = 0;
  //触发转移操作只有2种情况
  //1.其它线程调用了signal或者signalAll
  //2.其它线程中断了当前线程
  //所以只有转移成功或者被中断了才会退出循环,否则将一直在条件队列中阻塞
  while (!isOnSyncQueue(node)) {
    //阻塞线程
    LockSupport.park(this);
    //在条件队列中等待这段时间,需要检查是否被中断,不等于0则表示被中断了
    //因为不是被中断了就立刻响应,而是在await方法执行完要退出时才响应(见方法末尾)
    //所以需要用interruptMode来保存如何处理中断
    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
      //被中断就条出循环
      break;
  }
  //跳出循环,则说明已经转移到同步队列中了
  //调用独占模式的acquireQueued方法让线程开始竞争锁,并尝试恢复await前的state值
  //acquireQueued返回false则表示在同步队列中没有被中断过(见独占模式的分析)
  if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
    //能进入if则表示在同步队列中被中断过了
    //若之前在条件队列中记录是否被中断的interruptMode不为THROW_IE
    //则有可能是在signal方法调用之后被中断的,即值为REINTERRUPT
    //也有可能是没有被中断,而signal方法被调用了,结点被转移到同步队列,即值为0
    //这里再将interruptMode置为REINTERRUPT,即表示只要是在signal方法调用之后被中断
    //都采取重新中断的处理方式
    interruptMode = REINTERRUPT;
  //所以对线程的中断并不会影响其进入同步队列并获取到锁
  //而是将在什么情况下被中断的记录下来,放到最后处理
  //清理被CANCELLED的后继结点
  if (node.nextWaiter != null)
    unlinkCancelledWaiters();
  //如果线程被中断了
  if (interruptMode != 0)
    //则根据THROW_IE或REINTERRUPT进行抛出异常或重新中断
    reportInterruptAfterWait(interruptMode);
}

private void reportInterruptAfterWait(int interruptMode)
  throws InterruptedException {
  if (interruptMode == THROW_IE)
    throw new InterruptedException();
  else if (interruptMode == REINTERRUPT)
    selfInterrupt();
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
//判断结点是否被转移到同步队列中去了
final boolean isOnSyncQueue(Node node) {
  //结点状态为CONDITION就一定是在条件队列
  //prev为null也一定是在条件队列,因为同步队列prev为null的只可能是head(见独占模式的分析)
  if (node.waitStatus == Node.CONDITION || node.prev == null)
    return false;
  //条件队列是通过nextWaiter来维护单链表的,所以next应该为null
  if (node.next != null) 
    return true;
  //有可能node.prev不为null,但还没有被插入到同步队列,因为入队时的CAS操作可能会失败
  //所以需要从后向前遍历一次,确保他已经在同步队列上了
  return findNodeFromTail(node);
}

private boolean findNodeFromTail(Node node) {
  Node t = tail;
  for (;;) {
    if (t == node)
      return true;
    if (t == null)
      return false;
    t = t.prev;
  }
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
//退出等待时重新中断
private static final int REINTERRUPT =  1;
//退出等待时抛出InterruptedException
private static final int THROW_IE    = -1;

//checkInterruptWhileWaiting在while循环中
private int checkInterruptWhileWaiting(Node node) {
  //线程未被中断返回0
  //在signal方法调用之前被中断,返回THROW_IE表示后续要抛出InterruptedException
  //在signal方法调用之后被中断,返回REINTERRUPT表示要重新中断
  //这时结点都还在条件队列中,还没有进入同步队列
  return Thread.interrupted() ?
    (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
  0;
}

final boolean transferAfterCancelledWait(Node node) {
  //CAS设置成功说明还在条件队列中,即是在signal方法调用之前被中断的
  if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
    //调用独占模式入队的方法将结点转移到同步队列
    enq(node);
    return true;
  }
  //CAS失败则表示signal方法被调用了,waitStatus被改变不再是CONDITION
  //这时就需要自旋等待结点进入同步队列
  while (!isOnSyncQueue(node))
    Thread.yield();
  return false;
}

signal 方法

1
2
3
4
5
6
7
8
9
public final void signal() {
  //必须持有锁才能调用
  if (!isHeldExclusively())
    throw new IllegalMonitorStateException();
  Node first = firstWaiter;
  //如果条件队列不为空
  if (first != null)
    doSignal(first);
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
private void doSignal(Node first) {
  do {
    //因为first很快就要被转移到同步队列去,所以先将其后继结点设置为firstWaiter
    //如果first的后继为null则表示队列只有这一个结点,所以直接将lastWaiter置为null
    if ( (firstWaiter = first.nextWaiter) == null)
      lastWaiter = null;
    //先断开与条件队列的连接
    first.nextWaiter = null;
    //再将其转移到同步队列,如果转移不成功且还有结点,则继续转移后面的结点
    //这里转移失败只可能是因为结点被CANCELLED,所以可以直接处理后面的结点
  } while (!transferForSignal(first) &&
           (first = firstWaiter) != null);
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
final boolean transferForSignal(Node node) {
  //将状态从CONDITION变为0,如果失败则说明节点被CANCELLED,就不会再被转移到同步队列
  if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
    return false;
  //状态改变之后就插入同步队列,返回的是其前驱结点
  Node p = enq(node);
  int ws = p.waitStatus;  
  //如果前驱结点被CANCELLED,则不能唤醒node对应的线程了(原因见对独占模式的分析)
  if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
    //需要在这里手动唤醒
    LockSupport.unpark(node.thread);
  return true;
}

至于 signalAll 则与 signal 同理了,只不过是一次性处理所有的结点

1
2
3
4
5
6
7
8
public final void signalAll() {
  if (!isHeldExclusively())
    throw new IllegalMonitorStateException();
  Node first = firstWaiter;
  if (first != null)
    //仅仅是这里的doXxx方法有区别
    doSignalAll(first); 
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
private void doSignalAll(Node first) {
  //因为所有结点都将被转移到同步队列,所以直接将条件队列清空
  lastWaiter = firstWaiter = null;
  do {
    //单链表删除结点的常规方法,first引用从前往后遍历
    Node next = first.nextWaiter;
    first.nextWaiter = null;
    //只不过这里的“删除”是调用的transferForSignal方法
    transferForSignal(first);
    first = next;
  } while (first != null);
}

以上内容是玉山整理的笔记,如有错误还请指出