Java并发_学习DougLea的AQS论文(6)

DougLea 的 AQS 论文

java.util.concurrent 包 (JUC 包) 下有很多例如 ReentrantLock 这样的同步器,这些同步器基本都是基于 AbstractQueuedSynchronizer 类 (AQS) 来构建的

作者 DougLea 也专门写了一篇论文来介绍 AQS,论文的题目是 The java.util.concurrent Synchronizer Framework,地址

论文的摘要就介绍了 AQS 实现的是同步器需要的一些通用的功能,包括对同步状态的原子管理、线程的阻塞和排队,以及线程的解除阻塞

论文有背景介绍、需求、设计与实现、用法、性能五个部分,有关性能的部分因为涉及到 JVM 的实现,所以等以后再学习吧 ……

对于部分难懂句子的理解,我会将对应的论文原文在括号中粘贴出来,如果有理解错误,还请指出

背景介绍

通过 JCP 的 JSR 166 提案,在 J2SE-1.5 版本引入了 java.util.concurrent 包 (JUC 包),其中包含了一系列跟并发相关的同步器 (提案只定义了对应的抽象数据类型 ADT ),这些同步器需要维护下面的几个功能

  • 同步器内部同步状态的表示,例如一个锁要表示其状态是获取还是释放 (locked or unlocked),以及同步状态的检查和更新的操作

  • 至少有一个方法,在同步状态已经被其他线程获取的情况下 (if the state requires),可以将当前调用的线程阻塞

  • 以及其他线程如果释放了这个同步状态,这个线程可以从阻塞中恢复 (resuming when some other thread changes the synchronization state to permit it)

具体的同步器实现类包括:各种形式的互斥锁、读写锁、信号量、屏障、Future、事件指示器 (event indicators)、交接/迁移?队列 (handoff queues)

几乎任何的同步器都可以用来实现其他形式的同步器 (As is well-known 然后引用了一篇文章,咱也不知道咱也不敢问),但是这样实现会带来复杂性、开销、不灵活,充其量只是个二流项目 (to be at best a second-rate engineering),所以最后采用的方案是先造一个实现了通用功能的 AQS 类,具体的同步器再在其基础上进行具体实现

需求的功能

同步器至少要包含两种方法 (引用了一篇文章,咱不问),一个是 acquire 操作阻塞调用的线程,除非/直到 (unless/until) 同步状态允许其继续执行,另一个是 release 操作,通过某种方式改变同步状态并允许一个或多个被阻塞的线程释放

JUC 没有为同步器定义一个统一的 API,所以 acquire 和 release 操作在不同的类中可能定义的方法名会有不同,例如 Lock.lock, Semaphore.acquire, CountDownLatch.await 等,但每个同步器都要支持下面的三个操作

  • 阻塞和非阻塞地尝试获取同步器 (synchronization attempts),例如 tryLock 方法

  • 可选的超时设置,所以调用的线程可以放弃等待

  • 通过中断实现的取消任务的操作 (Cancellability via interruption),分为两个版本,一个是可取消的 acquire,另一个是不可取消的 acquire (separated into one version of acquire that is cancellable, and one that isn’t)

同步器可能会根据其是否只维护独占的同步状态而有所不同,对于独占的同步器,同时只能有一个线程可以执行 (only one thread at a time may continue past a possible blocking point),对于共享的同步器,则可以同时有多个线程执行

一般的同步器只用维护独占状态就可以了,但例如计数信号量 (counting semaphores),在数量允许的情况下,可以允许多个线程都获取到 (may be acquired by as many threads as the count permits),所以独占和共享这两种模式 AQS 都要进行实现

JUC 还定义了 Condition 接口,支持监视器类型的 (monitor-style) await 和 signal 操作,这些操作与独占模式的 Lock 类有关,并且 Condition 接口的实现本来就与 Lock 类有关,所以 AQS 还要实现有关条件锁的功能

设计与实现

同步器背后的基本思想非常简单 (The basic ideas behind a synchronizer are quite straightforward,然而我并不这样觉得) ,acquire 的操作如下

1
2
3
4
5
while (synchronization state does not allow acquire) {
  enqueue current thread if not already queued;
  possibly block current thread;
}
dequeue current thread if it was queued;

release 的操作如下

1
2
3
update synchronization state;
if (state may permit a blocked thread to acquire)
  unblock one or more queued threads;

要实现上面的操作,需要协调好下面三个基本组件

  • 同步状态的原子管理
  • 线程的阻塞与解除阻塞
  • 阻塞队列的管理

实现这三个组件独立地变化是有可能的,但这样实现会没有效率而且还很难用,例如,保存在队列结点中的信息必须与线程解除阻塞所需要的信息相匹配,并且暴露出的方法签名需要依赖于对应的同步状态 (方法签名就是指方法名和方法的参数列表的类型)

AQS 的设计核心就是决策好上面三个基本组件的具体实现,并且还要允许可以有多种使用方式的选择,虽然在实现的时候故意地限制了他的适用范围,但提供了足够的效率,所以没有理由不采用 AQS 而是从头开始实现同步器 (there is practically never a reason not to use the framework)

同步状态

用一个 32 位的 int 来保存同步状态,并且暴露出 getState、setState、compareAndSet 方法来操作这个 state,这些方法依赖于 JUC atomic 包的支持,并且读写操作都符合 JSR133 (Java Memory Model) 中 volatile 的语义

将同步状态设置为一个 32 位的 int 是源于实践的考虑 (其实就是 DougLea 当时考虑到实用性和性能等等方面觉得没有必要搞个 long 来保存),将来可能会实现一个 64 位的 (JDK 1.5 搞的 AQS ,JDK 1.6 就搞了 AbstractQueuedLongSynchronizer,即采用 long 来保存同步状态的 AQS,DougLea 老爷子真的高产),32 位对于 JUC 包下的大多数同步器都够用了,只有 CyclicBarrier 不够,所以他改用了锁来实现 (instead uses locks)

基于 AQS 的具体实现类必须定义 tryAcquire 和 tryRelease 方法来控制 acquire 和 release 的操作,当同步状态满足时 (synchronization was acquired) tryAcquire 方法要返回 true,而当新的同步状态允许以后的 acquire 时 (the new synchronization state may allow future acquires),tryRelease 方法要返回 true

并且这些方法都可以接收一个 int 参数来传递想要的状态 (communicate desired state),例如,可重入锁,当一个线程从条件等待中返回后,重新获取锁时,就需要这个参数来重新建立递归计数 (to re-establish the recursion count when re-acquiring the lock after returning from a condition wait) 但是很多同步器并不需要这个参数,忽略它即可

阻塞

在 JSR166 之前,没有可用于阻塞线程以及解除阻塞的 Java API 来创建不基于内置监视器的同步器 (there was no Java API available to block and unblock threads for purposes of creating synchronizers that are not based on built-in monitors),唯一可以选择的只有 Thread.suspend 和 Thread.resume,但这两个方法有缺陷 (即在调用 suspend 之前调用了 resume 的问题),所以采用了 JUC 包中的 LockSuport 类来解决这个问题 (然后讨论了一大堆看不懂的底层实现,暂且跳过这部分,知道 AQS 是 通过 LockSuport 来实现线程阻塞和恢复的就好了,其他的以后再看吧 ……)

队列

AQS 的核心就是维护阻塞线程的队列,并且该队列是严格的 FIFO 先进先出队列,所以 AQS 不支持基于优先级的同步

目前几乎没有争议的是,同步队列的最佳实现方案是采用没有使用更底层的锁 (lower-level locks) 来构造的非阻塞数据结构,主要有 Mellor-Crummey and Scott (MCS) 锁的变体、Craig, Landin, and Hagersten (CLH) 锁的变体这两种选择

因为 CLH 锁可以更容易地实现取消 (cancellation) 和超时 (timeouts) 的功能,所以选择了 CLH 锁来作为实现的基础,但最终的设计已经与原来的 CLH 锁有了较大的出入,所以下文将进行解释 (对 CLH队列的修改暂且跳过,以后再看 ……)

抛开这些实现的细节,基本的 acquire 操作的一般形式如下 (独占,不可中断,无超时)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
if (!tryAcquire(arg)) {
  node = create and enqueue new node;
  pred = node's effective predecessor;
  while (pred is not head node || !tryAcquire(arg)) {
    if (pred's signal bit is set)
      park();
    else
      compareAndSet pred's signal bit to true;
    pred = node's effective predecessor;
  }
  head = node;
}

对应的 release 操作

1
2
3
4
if (tryRelease(arg) && head node's signal bit is set) {
  compareAndSet head's signal bit to false;
  unpark head's successor, if one exists
}

acquire 的主循环次数依赖于具体的实现类中 tryAcquire 的实现,对于没有取消 (cancellation) 的情况,忽略 park ,acquire 和 release 都是 O(1) 的复杂度

要支持取消操作,就要在 acquire 循环中的 park 返回时检查中断或超时,因为超时或中断而取消的线程,将会修改其结点的状态并 unpark 其后继节点,判断其前驱节点和后继节点以及重置状态可能需要 O(n) 的复杂度,n 是队列的长度

条件队列

在 AQS 内部定义了 ConditionObject 类,提供给独占的同步器以及 Lock 接口的实现类使用,可以将任何数量的条件对象关联到一个锁对象,并提供了监视器风格 (monitor-style) 的 await、signal、signalAll 的操作,还包括带有超时的,以及一些检查、监控的方法

ConditionObject 使用的是和同步器使用的相同的队列结点,只是这些结点都在单独的条件队列中维护,signal 操作是通过将结点从条件队列变换 (transfer) 为独占队列实现的,不必在重新获取到锁之前唤醒已发出信号的线程 (signalled thread)

基本的 await 操作如下

1
2
3
4
create and add new node to condition queue;
release lock;
block until node is on lock queue;
re-acquire lock;

signal 操作如下

1
transfer the first node from condition queue to lock queue;

因为这些操作只有在持有锁的时候才能操作,所以采用的是顺序的链表 (sequential linked) 队列来作为条件队列,队列结点有一个 nextWaiter 字段,变换操作就是将条件队列的第一个结点与队列断开连接 (unlinks),然后通过 CLH 插入到独占队列 (compareAndSet 操作)

(后面还有一段是在讨论实现的复杂点,以及可能出现的什么问题等内容,等以后再看吧 ……)

用法

AQS 类将上面介绍的功能组合在一起,采用了模板方法模式,即将 AQS 作为基类提供给子类,具体的同步器只需定义其控制着 acquire 和 release 操作的同步状态的检查与更新的方法

但直接将 AQS 的子类作为同步器的 ADT 并不适合,因为子类会无可避免地 (necessarily) 暴露出需要在内部控制 acquire 和 release 规则的方法,这些方法是不应该被外部用户访问的,所以 JUC 包下的所有同步器都定义了一个继承了 AQS 的私有内部类,并且将所有同步方法都委托 (delegate) 给这个私有内部类,这样对于不同的同步器的 public 方法就可以使用适合自己的名称

下面是一个最简单的 Mutex 类的实现,使用同步状态 0 表示解锁,1 表示上锁,这个类不需要同步方法中的参数,所以将 0 作为实参,在方法实现中将其忽略即可

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
class Mutex {
  class Sync extends AbstractQueuedSynchronizer {
    public boolean tryAcquire(int ignore) {
      return compareAndSetState(0, 1);
    }
    public boolean tryRelease(int ignore) {
      setState(0); 
      return true;
    }
  }
  private final Sync sync = new Sync();
  public void lock() { 
    sync.acquire(0); 
  }
  public void unlock() { 
    sync.release(0); 
  }
}

在 AQS 类的头部注释中有完整的 Mutex 实例类

公平调度的控制

尽管 AQS 是基于 FIFO 队列的,但同步不一定完全公平,注意到 tryAcquire 是在入队前执行的,所以一个新的正在 acquire 的线程是可以偷取本应该属于队列头部第一个线程的获取同步的机会 (Thus a newly acquiring thread can “steal” access that is “intended” for the first thread at the head of the queue)

但这种冲撞的策略 (barging strategy) 可以提高吞吐率,比如当前的同步器已经被释放可以被排在队列头部的线程获取了,但此时这个线程正在接触阻塞的过程中 (the intended next thread is in the process of unblocking),导致这段时间同步器空闲

甚至对于只会短时间占有同步器的场景 (held only briefly),可以通过定义 tryAcquire 在 CPU 控制权让出之前 (before passing back control) 重复调用自己若干次 (itself retry a few times) 来尝试冲撞

所以冲撞策略的 FIFO 只有概率公平 (probablistic fairness),即冲撞的线程和队首第一个已完成解除阻塞的线程 (An unparked thread at the head of the lock queue) 有公平的竞争机会,但是如果冲撞线程到达的速度 (incoming threads arrive faster) 比队首第一个线程解除阻塞 (it takes an unparked thread to unblock) 的速度快,则队首线程将会很难赢得竞争,并且它的后继结点也会一直保持阻塞

对于需要严格公平性的情况,可以将 tryAcquire 方法定义为,如果当前线程不是队首的第一个则立刻返回 fales,可通过 getFirstQueuedThread 方法进行检查,这是 AQS 提供的为数不多的检查方法之一 (one of a handful of)

一个更快但非严格公平的变体是,如果队列在判断的瞬间 (momentarily) 为空则允许 tryAcquire 成功,在这种情况下多个线程遇到空的队列,则都会去竞争让自己获取到同步器 (race to be the first to acquire),通常情况是至少会有一个线程不用排队,JUC 包下所有支持竞争公平的同步器都是采用的这种实现策略

因为 Java Language Specification 没有提供调度保证,所以在单处理器上可能会增加同步器可用但没有线程能够来获取的情况,公平性设置在多处理器上的影响可能会更大,因为可以产生更多的交错 (interleavings)


以上内容是玉山整理的笔记,如有错误还请指出