Java并发_基于AQS的同步器上(10)

JUC同步器
JUC同步器

LockSupport

LockSupport 是 JUC 中的基础类,不用像 wait、notify 方法那样需要在同步代码块或同步方法中,底层实现依赖于 UNSAFE 类,AQS 就是通过其 park 和 unpark 方法来阻塞线程和解除阻塞的

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
public static void park() {
  UNSAFE.park(false, 0L);
}
public static void park(Object blocker) {
  Thread t = Thread.currentThread();
  //Thread类定义了属性volatile Object parkBlocker;用来记录是哪个对象阻塞的线程
  //setBlocker方法将传入的blocker对象通过UNSAFE写到parkBlocker
  setBlocker(t, blocker);
  UNSAFE.park(false, 0L);
  //被其他线程unpark后将parkBlocker置null
  setBlocker(t, null);
}
private static void setBlocker(Thread t, Object arg) {
  UNSAFE.putObject(t, parkBlockerOffset, arg);
}

public static void unpark(Thread thread) {
  if (thread != null)
    UNSAFE.unpark(thread);
}

LockSupport 的设计理念是 park 方法使线程等待一个许可,而 unpark 方法是为线程提供一个许可,许可是一次性的,被 park 消费掉就需要再次提供,允许 unpark 方法先于 park 调用,即允许先提供一个许可,线程可以之后再消费,所以 unpark 方法可以先于 park 方法调用而不会发生像 Thread.resume 先于 Thread.suspend 调用而造成的死锁问题,并且还能响应中断且不会抛出 InterruptException 异常,中断标记将被置为 true

建议使用 park(Object blocker) 方法而不建议使用空参的 park() 方法,因为可以通过传入的 blocker 对象来确定线程受阻塞的原因,另外的线程可通过 getBlocker(Thread t) 方法获取到传入的 blocker

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public class Test {
  private static Thread mainThread;
  public static void main(String[] args) {
    mainThread = Thread.currentThread();
    new Thread() {
      @Override
      public void run() {
        try {
          Thread.sleep(1000);
          System.out.println(Thread.currentThread().getName() + " 中断main");
          mainThread.interrupt();
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      }
    }.start();
    System.out.println(Thread.currentThread().getName() + " park自己");
    LockSupport.park();
    if (Thread.interrupted())
      System.out.println(Thread.currentThread().getName() + " 被中断了,苏醒,继续执行");
    new Thread() {
      @Override
      public void run() {
        try {
          Thread.sleep(1000);
          System.out.println(Thread.currentThread().getName() + " 获取传入park的blocker对象");
          System.out.println(LockSupport.getBlocker(mainThread));
          System.out.println(Thread.currentThread().getName() + " unpark main");
          LockSupport.unpark(mainThread);
          //唤醒main后等main先执行一下
          Thread.sleep(1000);
          System.out.println("unpark之后blocker被置" + LockSupport.getBlocker(mainThread));
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      }
    }.start();
    LockSupport.park("abc");
    System.out.println("main 被唤醒");
  }

}
//main park自己
//Thread-0 中断main
//main 被中断了,苏醒,继续执行
//Thread-1 获取传入park的blocker对象
//abc
//Thread-1 unpark main
//main 被唤醒
//unpark之后blocker被置null

注意,虽然 park 可以在 unpark 之后调用,但多次 unpark 的效果与一次 unpark 效果是一样的

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
public class Test {
  public static void main(String[] args) {
    LockSupport.unpark(Thread.currentThread());
    LockSupport.unpark(Thread.currentThread());
    LockSupport.park();
    System.out.println("abc");
    LockSupport.park(); //main在这里被阻塞
    System.out.println("abc");
  }
}

ReentrantLock

ReentrantLock 实现了 Lock 接口,内部类 sync 继承了 AQS 且有 FairSync 和 NonFairSync 两个子类,所以 ReentrantLock 锁是否公平是由 sync 创建出的子类决定的,可重入性则是将 AQS 的 state 状态属性用来记录进入锁的次数,独占性则是因为采用了 AQS 的独占模式

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
public class ReentrantLock implements Lock {
  private final Sync sync;
  public ReentrantLock() {
    //默认是非公平锁
    sync = new NonfairSync();
  }
  public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
  }
  public void lock() {
    //Sync类中定义的抽象方法,由两个子类各自实现
    sync.lock();
  }
  public void unlock() {
    //AQS释放的入口方法
    sync.release(1); 
  }
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
abstract static class Sync extends AbstractQueuedSynchronizer {
  //由公平和非公平两个子类各自实现
  abstract void lock();
  //公平和非公平释放的方式相同,所以在Sync中直接定义了,子类只用实现各自不同的获取锁的方法
  //重写AQS的tryRelease方法,传入的参数releases为1
  protected final boolean tryRelease(int releases) {
    //由于是可重入锁,每次退出则当前状态值-1,即getState()-releases
    int c = getState() - releases;
    //若当前线程不是锁的持有者则抛出异常
    if (Thread.currentThread() != getExclusiveOwnerThread())
      throw new IllegalMonitorStateException();
    //锁释放的标志
    boolean free = false;
    if (c == 0) {
      //状态值减为0则表示让出锁
      free = true;
      setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
  }
}

公平锁子类

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
static final class FairSync extends Sync {
  final void lock() {
    //锁空闲时状态值为0,被线程初次获取到则状态值变为1
    //由于是独占的可重入锁,所以一个线程每获取一次,状态值就+1
    acquire(1); //AQS获取的入口方法
  }
  //重写AQS的tryAcquire方法,传入的参数acquires为1
  protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    //获取volatile int state的状态值
    int c = getState();
    //对于ReentrantLock状态值为0则表示锁被让出了可以尝试获取
    if (c == 0) {
      //若锁空闲则先判断当前线程是不是在等待队列中的第一个线程
      if (!hasQueuedPredecessors() &&
          //若是则设置锁的状态,即状态值+1
          compareAndSetState(0, acquires)) {
        //并设置锁被当前线程占有
        setExclusiveOwnerThread(current);
        return true;
      }
    }
    //若锁已经被当前线程占有
    else if (current == getExclusiveOwnerThread()) {
      //由于是可重入锁,每次进入则当前状态值+1,即c+acquires
      int nextc = c + acquires;
      if (nextc < 0) // overflow
        throw new Error("Maximum lock count exceeded");
      //则更新锁的状态
      setState(nextc);
      return true;
    }
    return false;
  }
}

非公平锁子类

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
static final class NonfairSync extends Sync {
  final void lock() {
    //若锁当前是空闲的,则直接将状态置为1
    if (compareAndSetState(0, 1))
      //并将当前线程设置为锁的占有者
      setExclusiveOwnerThread(Thread.currentThread());
    else
      //若锁当前被占有,则进入tryAcquire方法不断尝试获取锁
      acquire(1);
  }
  protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
      //if (!hasQueuedPredecessors() &&
      //非公平用不判断当前线程是不是在等待队列中的第一个,允许冲撞(见DougLeaAQS论文的学习)
      //通过CAS保证线程安全
      if (compareAndSetState(0, acquires)) {
        setExclusiveOwnerThread(current);
        return true;
      }
    }
    //下面的代码与公平的相同...
  }
}

ReentrantLock 需要手动释放,可在 finally 中调用 lock.unlock 方法,下面是一个卖票窗口的例子

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public class Test {
  public static Lock lock = new ReentrantLock();
  public static int tickets = 10;
  public static void main(String[] args) {
    for (int i = 0; i < 3; i++) {
      new Thread() {
        @Override
        public void run() {
          while (true) {
            try {
              lock.lock();
              if (tickets > 0) {
                Thread.yield();
                tickets--;
                System.out.println(Thread.currentThread().getName() + "卖出一张还剩" + tickets);
              } else {
                System.out.println(Thread.currentThread().getName() + "票售罄");
                break;
              }
            } finally {
              lock.unlock();
            }
          }
        }
      }.start();
    }
  }
}
//Thread-2卖出一张还剩9
//Thread-0卖出一张还剩8
//Thread-1卖出一张还剩7
//Thread-2卖出一张还剩6
//Thread-2卖出一张还剩5
//Thread-0卖出一张还剩4
//Thread-1卖出一张还剩3
//Thread-2卖出一张还剩2
//Thread-0卖出一张还剩1
//Thread-1卖出一张还剩0
//Thread-1票售罄
//Thread-2票售罄
//Thread-0票售罄

Condition

Condition 的 await、signal、signalAll 方法分别对应于 Object 类中的 wait、notify、notifyAll 方法,不同的是 Condition 不是依赖 synchronized,而是依赖 Lock,可以实现 synchronized 不能的唤醒指定的线程

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public class Test {
 private static Lock lock = new ReentrantLock();
 private static Condition condition1 = lock.newCondition();
 private static Condition condition2 = lock.newCondition();
 public static void main(String[] args) throws InterruptedException {
   new Thread() {
     @Override
     public void run() {
       lock.lock();
       try {
         System.out.println(Thread.currentThread().getName() + " await");
         condition1.await();
         System.out.println(Thread.currentThread().getName() + " 被signal");
       } catch (InterruptedException e) {
         e.printStackTrace();
       } finally {
         lock.unlock();
       }
     }
   }.start();
   new Thread() {
     @Override
     public void run() {
       lock.lock();
       try {
         System.out.println(Thread.currentThread().getName() + " await");
         condition2.await();
         System.out.println(Thread.currentThread().getName() + " 被signal");
       } catch (InterruptedException e) {
         e.printStackTrace();
       } finally {
         lock.unlock();
       }
     }
   }.start();
   //让创建的两个线程先执行
   Thread.sleep(1000);
   lock.lock();
   condition1.signal();
   condition2.signal();
   lock.unlock();
 }
}
//Thread-0 await
//Thread-1 await
//Thread-0 被signal
//Thread-1 被signal

CyclicBarrier

AQS 论文提到 CyclicBarrier 可能 32 位的 state 不够用,所以采用了锁,CyclicBarrier 循环栅栏是让一组线程互相等待,直到所有线程都执行到设置的公共屏障点再一起继续执行,循环即表示该栅栏可以重复使用,若栅栏成功打开,则 await 方法将会返回一个序号,若其中一个线程在 await 时被中断或 await 超时了,则所有线程都将中断并抛出 BrokenBarrierException 异常

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
public class CyclicBarrier {
  private final ReentrantLock lock = new ReentrantLock();
  private final Condition trip = lock.newCondition();
  //通过generation对象,记录属于哪一代,即哪一组线程
  private Generation generation = new Generation();
  private static class Generation {
    boolean broken = false;
  }
	//指定达到屏障的线程数量
  private final int parties;
	//count被初始设为parties,每当一个线程到达屏障则-1
  private int count;
}
1
2
3
4
5
6
7
8
9
public int await() throws InterruptedException, BrokenBarrierException {
  try {
    //dowait方法中的代码被包围在ReentrantLock锁中,并调用Condition的await方法
    //让当前线程阻塞,直到有指定数量的线程到达栅栏,或有线程被中断、等待超时
    return dowait(false, 0L);
  } catch (TimeoutException toe) {
    throw new Error(toe);
  }
}
1
2
3
4
5
6
7
8
//若count被减到0,则打开屏障更换下一代,即更换一组线程准备重新设置栅栏
private void nextGeneration() {
  //调用Condition的signalAll方法唤醒被await方法阻塞的线程
  trip.signalAll();
  //重新设置count计数器
  count = parties;
  generation = new Generation();
}

使用例子

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class Test {
  private static CyclicBarrier cb = new CyclicBarrier(3);
  public static void main(String[] args) {
    for (int i = 0; i < 3; i++) {
      new Thread() {
        @Override
        public void run() {
          try {
            Thread.sleep((int) (Math.random() * 3) * 1000);
            System.out.println(Thread.currentThread().getName() + " 等待cb");
            int order1 = cb.await();
            System.out.println(Thread.currentThread().getName() +
                               " 开始执行,cb返回的序号为 " + order1);
            Thread.sleep((int) (Math.random() * 3) * 1000);
            System.out.println(Thread.currentThread().getName() + " 再次等待cb");
            int order2 = cb.await();
            System.out.println(Thread.currentThread().getName() +
                               " 开始执行,cb返回的序号为 " + order2);
          } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
          }
        }
      }.start();
    }
  }
}
//Thread-2 等待cb
//Thread-1 等待cb
//Thread-0 等待cb
//Thread-0 开始执行,cb返回的序号为 0
//Thread-2 开始执行,cb返回的序号为 2
//Thread-1 开始执行,cb返回的序号为 1
//Thread-1 再次等待cb
//Thread-0 再次等待cb
//Thread-2 再次等待cb
//Thread-2 开始执行,cb返回的序号为 0
//Thread-0 开始执行,cb返回的序号为 1
//Thread-1 开始执行,cb返回的序号为 2

以上内容是玉山整理的笔记,如有错误还请指出