Java并发_AQS源码分析上(7)

队列结点

首先查看内部类 Node 的定义

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
static final class Node {
  //共享模式结点等待的标记,即共享模式的标记就是一个空的Node对象
  static final Node SHARED = new Node();
  //独占模式结点等待的标记
  static final Node EXCLUSIVE = null;

  volatile int waitStatus;
  //waitStatus值,表示线程已取消
  static final int CANCELLED =  1;
  //waitStatus值,表示后继结点的线程需要unparking
  static final int SIGNAL    = -1;
  //waitStatus值,表示线程正在等待条件
  static final int CONDITION = -2;
  //waitStatus值,表示下一个acquireShared应该无条件地继续传播,后面再详细介绍
  static final int PROPAGATE = -3;

  //前驱和后继
  volatile Node prev;
  volatile Node next;

  //结点对应的线程,在构造时进行初始化,使用后置null
  volatile Thread thread;

  //两个作用,一是用来标识共享队列,二是条件队列的后继
  Node nextWaiter;

  final boolean isShared() {
    return nextWaiter == SHARED;
  }

  final Node predecessor() throws NullPointerException {
    Node p = prev;
    if (p == null)
      throw new NullPointerException();
    else
      return p;
  }

  //用来创建初始的头部结点,或者共享模式的标记
  Node() {
  }

  //addWaiter方法会调用
  Node(Thread thread, Node mode) {
    this.nextWaiter = mode;
    this.thread = thread;
  }

  //Condition会用到
  Node(Thread thread, int waitStatus) { 
    this.waitStatus = waitStatus;
    this.thread = thread;
  }
}

特别关注一下 waitStatus 的 5 种状态,在源码中将会有很多状态的变化

类型 对应值 说明
0 线程进队后的默认状态
SIGNAL -1 若当前结点的前驱状态为SIGNAL,则当前结点可以排在其后面
PROPAGATE -3 共享模式,用于将唤醒后继的操作往后传递
CANCELLED 1 当前结点由于超时或中断被取消,进入该状态后结点将不再变化
CONDITION -2 条件模式,当前结点在条件队列,若达成条件则转移到同步队列

还定义了首尾两个引用 head 和 tail 来维护队列,通过 volatile 保证可见性

1
2
3
4
//只会被setHead修改,只要头结点存在,则其waitStatus就不会被设置为CANCELLED
private transient volatile Node head;
//只有在新插入结点时才会改变
private transient volatile Node tail;

所以独占模式和共享模式的同步队列大概形如下图

AQS同步队列
AQS同步队列

head 在逻辑上表示的是当前获取到同步器的线程,但对应的结点是一个虚结点,不存储线程的信息

1
2
3
4
5
private void setHead(Node node) {
  head = node;
  node.thread = null;
  node.prev = null;
}

主要方法

操作同步状态 state 的方法,定义为 final 是为了防止被子类重写,定义为 protected 是为了让子类可以直接调用,而外部其他类则不可以

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
private volatile int state;
protected final int getState() {
  return state;
}
protected final void setState(int newState) {
  state = newState;
}
protected final boolean compareAndSetState(int expect, int update) {
  return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

子类实现具体的同步器可以重写下面的方法,没有定义为 abstract 是因为,如果子类只选择了一种模式,则只用重写对应模式的方法即可,而不用全部都重写,按照语义的基本定义为,如果成功则返回 true,失败则返回 false

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
//独占模式
protected boolean tryAcquire(int arg) {
  throw new UnsupportedOperationException();
}
protected boolean tryRelease(int arg) {
  throw new UnsupportedOperationException();
}
//判断当前线程是否占有着独占同步器
protected boolean isHeldExclusively() {
  throw new UnsupportedOperationException();
}
//共享模式
//成功但没有剩余资源则返回0,有剩余资源则返回正数,失败则返回负数
protected int tryAcquireShared(int arg) {
  throw new UnsupportedOperationException();
}
protected boolean tryReleaseShared(int arg) {
  throw new UnsupportedOperationException();
}

AQS 实现了很多方法,可以从语义上分为获取和释放,从模式上分为独占和共享,以及是否支持中断、是否支持超时,例如:

独占模式的获取和释放对应的是 acquire 和 release,对应支持中断的 acquireInterruptibly

共享模式的获取和释放对应的是 acquireShared 和 releaseShared,对应支持中断的 acquireSharedInterruptibly

下面开始依次分析独占模式、共享模式,以及条件锁,只查看不支持中断的获取和释放的方法

独占模式

从获取同步器的入口方法 acquire 方法开始,查看如何安排获取同步器失败的线程进入队列等待

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
//具体的同步器实现可以通过arg参数传递想要的同步状态
public final void acquire(int arg) {
  //首先调用tryAcquire方法尝试获取同步器
  if (!tryAcquire(arg) &&
      //若获取失败,则调用addWaiter方法将当前线程插入到等待队列的最后
      //acquireQueued方法则是让队列中的线程等待,并让排队到前面的线程尝试获取同步器
      acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
    //线程在等待过程中不对中断做出响应,而是在获取到同步器之后再进行自我中断
    //selfInterrupt方法调用的是Thread的interrupt方法
    selfInterrupt();
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
//返回被插入到队列最后的线程结点
private Node addWaiter(Node mode) {
  //根据指定的独占模式Node.EXCLUSIVE创建线程结点
  Node node = new Node(Thread.currentThread(), mode);
  Node pred = tail;
  //若队列不为空
  if (pred != null) {
    //直接将新建结点的前驱引用指向队列的最后一个结点
    node.prev = pred;
    //然后再通过CAS操作将tail引用指向新建的结点node,只尝试一次
    if (compareAndSetTail(pred, node)) {
      //队列中最后一个结点的后继指向新建结点
      pred.next = node;
      return node;
    }
  }
  //enq方法是在队列为空,或者插入新建结点失败时调用
  enq(node);
  return node;
}

addWaiter
addWaiter

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
//enq负责初始化head和插入在addWaiter方法中插入失败的结点
private Node enq(final Node node) {
  //CAS自旋
  for (;;) {
    Node t = tail;
    //若队列为空
    if (t == null) {
      //CAS操作初始化head将其指向一个新建结点
      if (compareAndSetHead(new Node()))
        tail = head;
    //若队列已经初始化了
    } else {
      //若线程A和B同时进入当前初始化方法,可能线程A已经完成了head的初始化
      //此时对于线程B就变成在队列最后插入结点的情况,即addWaiter方法的情况
      //所以插入方式同addWaiter方法,只不过这里有CAS自旋会一直尝试直到插入成功
      node.prev = t;
      if (compareAndSetTail(t, node)) {
        t.next = node;
        return t;
      }
    }
  }
}

acquireQueued 方法负责在获取同步器失败的线程被包装为结点插入到等待队列末尾之后,让队列中的线程进入等待状态,直到正在执行的线程释放了其占有的同步器,再让队首的结点尝试获取

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
//入参node是addWaiter返回值,即插入成功的结点
final boolean acquireQueued(final Node node, int arg) {
  //标记是否获取到同步器
  boolean failed = true;
  try {
    //标记等待过程中是否被中断过
    boolean interrupted = false;
    //CAS自旋
    for (;;) {
      final Node p = node.predecessor();
      //若前驱是head,则当前结点排到队列的第二个,即队列中第一个保存线程数据的结点
      //head持有的同步器随时都可能被释放,让当前结点开始不断地尝试获取同步器(CAS自旋)
      if (p == head && tryAcquire(arg)) {
        //获取到同步器后将head指向当前结点
        //当前结点的thread被置null,表示当前线程获取到同步器,出队结束等待
        setHead(node);
        //即head.next = null
        p.next = null;
        //所有引用都置null了,所以之前旧的head结点将被GC回收
        //此时head指向了当前结点,则当前结点变为新的不保存线程数据的头结点
        failed = false;
        //获取资源成功,返回的中断标记没有被修改还是false,即表示当前结点是被正常唤醒的
        return interrupted;
      }
      //若当前结点不是排到第二个,则继续等待
      //shouldParkAfterFailedAcquire方法是检查队列中结点的状态
      //因为有可能当前结点虽然不是第二个,但可能其前面的结点都放弃等待了
      if (shouldParkAfterFailedAcquire(p, node) &&
          parkAndCheckInterrupt())
        //若在等待过程中被中断,则将interrupted标记为true
        interrupted = true;
    }
  } finally {
    if (failed)
      cancelAcquire(node);
  }
}

shouldParkAfterFailedAcquire 方法检查队列中结点的状态,保证当前结点前面的结点状态是 SIGNAL,如果不是则会将当前结点往前移,让在等待中途失效的结点被 GC 回收

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
  //获取当前结点的前驱结点的状态
  int ws = pred.waitStatus; 
  //若前驱结点的状态是SIGNAL,则在其释放同步器之后会唤醒其后继结点
  if (ws == Node.SIGNAL)
    //所以当前结点可以排在其后面
    return true;
  //若前驱结点失效了
  if (ws > 0) {
    //则向前遍历,直到遇到最近的有效的结点,排在其的后面
    //取消的结点会被GC回收
    do {
      node.prev = pred = pred.prev;
    } while (pred.waitStatus > 0);
    pred.next = node;
  //若前驱结点有效
  } else {
    //则将前驱结点的状态置为SIGNAL
    compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
  }
  return false;
}

由于 shouldParkAfterFailedAcquire 方法是在 acquireQueued 方法的 CAS 自旋中,所以若一个线程在被包装为结点并插入到队列末尾之后,会不断地在队列中寻找最近的状态为 SIGNAL 的结点,找到后就排在其后面,若其前驱结点有效但状态不为 SIGNAL ,则将其状态置为 SIGNAL 并排在其后面,然后通过 parkAndCheckInterrupt 方法让当前线程进入等待状态直到被唤醒

1
2
3
4
5
6
private final boolean parkAndCheckInterrupt() {
  //让线程进入WAITING状态
  LockSupport.park(this);
  //判断是否被中断并清除当前的中断状态
  return Thread.interrupted();
}

所以在独占模式下,同步队列中有效的结点 (除了刚入队的) 的状态都将被置为 SIGNAL (包括头结点),线程依次出队并被唤醒,线程出队对应的结点将被作为新的头结点,线程数据将被置 null


再从释放同步器的入口方法 release 方法开始,查看是如何安排队列中的线程出队并唤醒后继结点的

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
//与acquire方法同理,具体的同步器实现可以通过arg参数传递想要的同步状态
public final boolean release(int arg) {
  //调用tryRelease方法尝试释放同步器
  if (tryRelease(arg)) {
    Node h = head;
    if (h != null && h.waitStatus != 0)
      //唤醒头结点的后继结点
      unparkSuccessor(h);
    //释放同步器成功并返回true,让tryAcquire方法可以成功获取到同步器
    return true;
  }
  return false;
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
//这个方法在共享模式下被复用了,现在只讨论独占模式,此时传入的node是头结点
private void unparkSuccessor(Node node) {
  //获取当前头结点的状态
  int ws = node.waitStatus;
  //若当前头结点有效则将其状态置0,原本应该为SIGNAL 
  if (ws < 0)
    //将头节点的状态置为0 
    compareAndSetWaitStatus(node, ws, 0);
  //头结点的后继,即队列中排在第二的结点
  Node s = node.next;
  //若第二个结点为null或已失效
  if (s == null || s.waitStatus > 0) {
    s = null;
    //则从后向前遍历队列,直到找到离第二个结点最近的状态为SIGNAL的结点
    //从后向前遍历是考虑到之前addWaiter方法是尾插的结点
    for (Node t = tail; t != null && t != node; t = t.prev)
      if (t.waitStatus <= 0)
        s = t;
  }
  if (s != null)
    //唤醒结点对应的线程
    LockSupport.unpark(s.thread);
}

从后向前遍历是考虑到之前 addWaiter 方法是尾插的结点,插入结点时是先执行的 node.prev = pred,即让新入队的结点先连接上当前的尾结点,尾结点的 next 还没有指向新来的结点,此时指向的还是 null,所以结点的 next 指向 null 这个条件并不能说明是队列的最后一个结点


独占队列线程的进队出队流程大致如下图

AQS独占进队出队
AQS独占进队出队


以上内容是玉山整理的笔记,如有错误还请指出