Java并发_基于AQS的同步器下(11)

CountDownLatch

CountDownLatch 闭锁与 CyclicBarrier 栅栏有点像,但却是两个不同的概念

CyclicBarrier 栅栏是让一组线程互相等待进而执行到同一位置,而 CountDownLatch 闭锁是让 1 个或多个线程等待其他线程执行完成后再执行,且栅栏可以重复使用,而闭锁只能使用一次

CountDownLatch 是通过 AQS 共享模式实现的,将状态作为计数器

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class CountDownLatch {
  //构造时指定计数
  public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
  }
  public void await() throws InterruptedException {
    //这里传入的参数1实际上没用
    sync.acquireSharedInterruptibly(1);
  }
  public void countDown() {
    //这里传入的参数1实际上没用
    sync.releaseShared(1);
  }
  private final Sync sync;
  private static final class Sync extends AbstractQueuedSynchronizer {
    Sync(int count) {
      setState(count);
    }
    protected int tryAcquireShared(int acquires) {
      //若计数器为0则锁空闲,则返回1,否则返回-1
      return (getState() == 0) ? 1 : -1;
    }
    protected boolean tryReleaseShared(int releases) {
      for (;;) {
        int c = getState();
        if (c == 0)
          return false;
        //计数器-1
        int nextc = c-1;
        //通过CAS对state进行赋值
        if (compareAndSetState(c, nextc))
          //只有当count-1后为0才返回true
          return nextc == 0;
      }
    }
  }
}

CountDownLatch 将 state 设置为计数器且对象创建时赋初值为 count,表示可以有 count 个线程能同时获取其共享锁,若一个线程调用了其对象的 await 方法,则该线程进入 AQS 的等待队列,直到获取到资源才能继续执行,CountDownLatch 将获取到资源实现为其他线程调用 countDown 方法来让计数器 -1,直到计数器减为 0 ,则将等待队列中的线程唤醒

可以用 CountDownLatch 来作为开关

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class Test {
  private static CountDownLatch start = new CountDownLatch(1);
  public static void main(String args[]) throws InterruptedException {
    for (int i = 0; i < 3; i++) {
      new Thread() {
        public void run() {
          try {
            System.out.println(Thread.currentThread().getName() + " 等待");
            //start计数器减到0之前一直等待
            start.await();
            System.out.println(Thread.currentThread().getName() + " 开始执行");
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
        }
      }.start();
    }
    Thread.sleep(3000);
    // 打开startSignal开关,执行所有等待的任务
    start.countDown();
  }
}
//Thread-0 等待
//Thread-2 等待
//Thread-1 等待
//Thread-1 开始执行
//Thread-0 开始执行
//Thread-2 开始执行

Semaphore

Semaphore 计数信号量可用来控制同时访问某个资源的线程数量,当信号量中有可用许可时,线程就能获取许可,若没有了则需要等待已占用许可的线程释放出许可

Semaphore 也是通过 AQS 共享模式实现的,将 AQS 中定义的状态 state 作为剩余许可数,同 ReentrantLock一样,Semaphore 也包含了公平和非公平的两个 sync 对象,与 CountDownLatch 减少 state 值为释放操作相反,Semaphore 减少 state 值是获取操作

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
public class Semaphore {
  //构造函数,默认非公平
  public Semaphore(int permits) {
    sync = new NonfairSync(permits);
  }
  public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
  }
  //不响应中断的信号量获取的入口方法
  public void acquireUninterruptibly(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    //要调用的tryAcquireShared方法分别在公平和非公平中各有定义
    sync.acquireShared(permits);
  }
  //信号量的释放,入口方法
  public void release(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    //由于公平和非公平释放的方式相同,所以要调用的tryReleaseShared方法在Sync类中实现
    sync.releaseShared(permits);
  }
}

Sync内部类

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
private final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer {
  Sync(int permits) {
    setState(permits);
  }
  //重写的tryAcquireShared方法在公平和非公平对应的子类中见
  //重写的tryReleaseShared方法
  protected final boolean tryReleaseShared(int releases) {
    for (;;) {
      //剩余许可数
      int current = getState();
      //剩余+本次释放
      int next = current + releases;
      //超过指定的最大值了
      if (next < current)
        throw new Error("Maximum permit count exceeded");
      //CAS更新下state
      if (compareAndSetState(current, next))
        return true;
    }
  }
}

非公平子类

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
static final class NonfairSync extends Sync {
  NonfairSync(int permits) {
    super(permits);
  }
  protected int tryAcquireShared(int acquires) {
    for (;;) {
      int available = getState();
      int remaining = available - acquires;
      if (remaining < 0 ||
          compareAndSetState(available, remaining))
        return remaining;
      //返回的remaining是若扣去acquires之后,将要剩余的许可数,是在尝试没有真的扣
      //所以若remaining < 0则直接返回不去修改state,对应线程进入队列开始等待
    }
  }
}

公平子类

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
static final class FairSync extends Sync {
  FairSync(int permits) {
    super(permits);
  }
  protected int tryAcquireShared(int acquires) {
    for (;;) {
      //公平和非公平就多了这一步判断
      //判断当前结点是否有前驱结点,若有则直接返回,获取失败
      if (hasQueuedPredecessors())
        return -1;
      int available = getState();
      int remaining = available - acquires;
      if (remaining < 0 ||
          compareAndSetState(available, remaining))
        return remaining;
    }
  }
}

使用例子

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class Test {
  private static volatile Semaphore sp = new Semaphore(2);
  public static void main(String[] args) {
    for (int i = 0; i < 3; i++) {
      new Thread() {
        @Override
        public void run() {
          try {
            sp.acquireUninterruptibly(1);
            System.out.println(Thread.currentThread().getName() + " 获取1个许可");
            Thread.sleep(1000);
            System.out.println(Thread.currentThread().getName() + " 释放1个许可");
            sp.release(1);
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
        }
      }.start();
    }
  }
}
//Thread-1 获取1个许可
//Thread-0 获取1个许可
//Thread-1 释放1个许可
//Thread-2 获取1个许可
//Thread-0 释放1个许可
//Thread-2 释放1个许可

ReentrantReadWriteLock

ReadWriteLock 读写锁接口,维护了读和写两个锁,写锁是独占的,读锁是共享

1
2
3
4
public interface ReadWriteLock {
  Lock readLock();
  Lock writeLock();
}

ReentrantReadWriteLock 是实现类,其中包括了内部类 ReadLock 和 WriteLock

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
public class ReentrantReadWriteLock
  implements ReadWriteLock, java.io.Serializable {
  private final ReentrantReadWriteLock.ReadLock readerLock;
  private final ReentrantReadWriteLock.WriteLock writerLock;
  final Sync sync;
  public ReentrantReadWriteLock() {
    this(false); //默认非公平
  }
  public ReentrantReadWriteLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
    readerLock = new ReadLock(this);
    writerLock = new WriteLock(this);
  }
  public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
  public ReentrantReadWriteLock.ReadLock  readLock()  { return readerLock; }
}

内部的 Sync 类

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
abstract static class Sync extends AbstractQueuedSynchronizer {
  //高16位为读锁,低16位为写锁
  static final int SHARED_SHIFT   = 16;
  //读锁单位
  static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
  //读锁最大数量
  static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
  //写锁最大数量
  static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
  //本地线程计数器
  private transient ThreadLocalHoldCounter readHolds;
  //缓存的计数器
  private transient HoldCounter cachedHoldCounter;
  //第一个读线程
  private transient Thread firstReader = null;
  //第一个读线程的计数
  private transient int firstReaderHoldCount;

  Sync() {
    //本地线程计数器
    readHolds = new ThreadLocalHoldCounter();
    setState(getState());
  }

  //读锁使用
  static final class HoldCounter {
    //重入次数
    int count = 0;
    final long tid = getThreadId(Thread.currentThread());
  }
  static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> {
    public HoldCounter initialValue() {
      return new HoldCounter();
    }
  }
  
  //...
}

基于 Sync 和 ReadLock 和 WriteLock

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
public static class ReadLock implements Lock, java.io.Serializable {
  private final Sync sync;
  protected ReadLock(ReentrantReadWriteLock lock) {
    sync = lock.sync;
  }
  public void lock() {
    sync.acquireShared(1);
  }
  public void unlock() {
    sync.releaseShared(1);
  }
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
public static class WriteLock implements Lock, java.io.Serializable {
  private final Sync sync;
  protected WriteLock(ReentrantReadWriteLock lock) {
    sync = lock.sync;
  }
  public void lock() {
    sync.acquire(1);
  }
  public void unlock() {
    sync.release(1);
  }
  //支持Condition
  public Condition newCondition() {
    return sync.newCondition();
  }
}

至于具体的 tryAcquire、tryRelease、tryAcquireShared、tryReleaseShared 四个方法以后再看吧

ReentrantReadWriteLock 一次只能有一个线程可以写,但可以有多个线程同时读,所以读写锁适用于读大于写的情况,但是只有读锁和写锁都没有没占有的时候,才能获取到写锁,所以可能会出现写操作的线程长时间获取不到锁的饥饿情况,Java 8 引入的 StampedLock 解决了这个问题

使用例子

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
public class Test {
  public static int num = 0;
  public static ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
  public static void main(String[] args) {
    for (int i = 0; i < 10; i++) {
      new Thread() {
        @Override
        public void run() {
          try {
            Thread.sleep(1000);
            lock.readLock().lock();
            System.out.println(Thread.currentThread().getName() +
                               "读取n为 " + num);
            lock.readLock().unlock();
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
        }
      }.start();
      if (i >= 8) {
        new Thread() {
          @Override
          public void run() {
            try {
              Thread.sleep(1000);
              int n = (int) (Math.random() * 3 * 10);
              lock.writeLock().lock();
              num = n;
              lock.writeLock().unlock();
              System.out.println(Thread.currentThread().getName() +
                                 "修改n为 " + n);
            } catch (InterruptedException e) {
              e.printStackTrace();
            }
          }
        }.start();
      }
    }
  }
}
//Thread-1读取n为 0
//Thread-0读取n为 0
//Thread-3读取n为 0
//Thread-2读取n为 0
//Thread-4读取n为 0
//Thread-7读取n为 0
//Thread-6读取n为 0
//Thread-5读取n为 0
//Thread-8读取n为 0
//Thread-10读取n为 0
//Thread-9修改n为 16
//Thread-11修改n为 24

以上内容是玉山整理的笔记,如有错误还请指出