Java并发_JUC线程池(12)

模拟线程复用

Thread 的 start 方法只能调用一次,要想复用线程只能从 Runnable 接口的 run 方法入手

实现一个特殊的线程复用任务让线程执行,特殊任务的 run 方法是无限循环的,逻辑为不断检测是否有任务传入,若传入了要执行的任务则调用其 run 方法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
public class Test {
  public static void main(String[] args) {
    //创建线程池
    Mythreadpool threadPool = new Mythreadpool();
    //添加任务到线程池的任务队列
    for (int i = 0; i < 50; i++) {
      threadPool.execute(new Task(i));
    }
  }
}

class Task {
  int id;
  Task(int id) {
    this.id = id;
  }
  public void run() {
    try {
      Thread.sleep(500);
      System.out.println(
        Thread.currentThread().getName() + "执行" + id + "号任务");
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }
}

class Mythreadpool  {
  //任务队列
  private final LinkedList<Task> taskList = new LinkedList<>();
  //工作线程列表
  private final ArrayList<Worker> workerList = new ArrayList<>();

  class Worker extends Thread {
    @Override
    public void run() {
      while (true) {
        synchronized (taskList) {
          if (taskList.size() == 0) {
            try {
              //没有任务了就睡眠,等待新来任务后被唤醒
              taskList.wait();
            } catch (InterruptedException e) {
              e.printStackTrace();
            }
          }
          //从任务队列获取任务执行
          taskList.removeFirst().run();
        }
      }
    }
  }
  //线程池开启两个线程
  private void start() {
    for (int i = 0; i < 2; i++) {
      Worker w = new Worker();
      workerList.add(w);
      w.start();
    }
  }
  public Mythreadpool() {
    start();
  }
  //添加任务到队列
  public void execute(Task task) {
    synchronized(taskList) {
      taskList.addLast(task);
      //唤醒一个睡眠的线程
      taskList.notify();
    }
  }
}

JUC 线程池

Executor 接口定义了 void execute(Runnable command) 方法将任务添加到线程池中执行

ExecutorService 接口继承了 Executor,提供了 submit 方法来提交任务到线程池,invokeAll 方法让线程执行任务,目的是将开启线程和传入任务让线程执行两个操作分离开,使开启的线程能够复用

AbstractExecutorService 抽象类实现了 ExecutorService 接口,提供了在 ExecutorService 接口中定义的方法的默认实现

ThreadPoolExecutor

ThreadPoolExecutor
ThreadPoolExecutor

ThreadPoolExecutor 类继承了 AbstractExecutorService 抽象类,包含以下属性

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
//线程工厂,用来开启线程
private volatile ThreadFactory threadFactory;
//一个Worker对应一个线程,workers即为线程的集合
private final HashSet<Worker> workers = new HashSet<Worker>();
//任务队列,若此时没有空闲的线程,则将新提交的任务放入任务队列中阻塞等待
private final BlockingQueue<Runnable> workQueue;

//corePoolSize表示线程池基本容量,可以新开启线程超过这个容量
//maximumPoolSize线程池允许的最大线程数
private volatile int corePoolSize;
private volatile int maximumPoolSize;

//线程池的线程数达到过的最大值
private int largestPoolSize;
//已完成的任务数量
private long completedTaskCount;

//是否允许为核心数量的线程设置存活时间,默认为false,即表示即使没有被分配任务也存活
private volatile boolean allowCoreThreadTimeOut;
//若线程数超过corePoolSize,多余的空闲的线程存活时间
private volatile long keepAliveTime;

//拒绝处理提交任务的策略,默认AbortPolicy
private volatile RejectedExecutionHandler handler;

//互斥锁,实现对线程池的互斥访问
private final ReentrantLock mainLock = new ReentrantLock();
//与mainLock对应的终止条件
private final Condition termination = mainLock.newCondition();

当有新任务提交到线程池,若当前线程数小于 corePoolSize,则新开启线程来执行任务,若当前线程数达到了 corePoolSize,则任务将会被放入任务队列阻塞等待,若任务队列满了,且此时当前线程数小于 maximumPoolSize,则新开启线程来执行任务,若线程数达到了 maximumPoolSize,则拒绝新任务提交,所以当前线程数可以比 corePoolSize 大,corePoolSize 和 maximumPoolSize 共同决定了线程池中实际运行的线程的数量

即优先使用 corePoolSize 数量的线程执行任务,这部分线程默认一直存活,如果线程不够了则将任务入队,如果任务队列满了则再新开线程,这部分新开的线程若空闲超时则会被销毁,若线程数达到了最大的 maximumPoolSize,则不再接收任务

创建线程池

构造方法

1
2
3
4
5
6
7
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
                          long keepAliveTime, TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
	//...
}

对于计算密集型的任务,线程池中线程的数量设置为 CPU 核心数 + 1,增加线程数只会增加 CPU 切换的时间消耗,不能提高 CPU 使用率

对于 IO 密集型的任务,要根据具体的 CPU 的等待时间和计算时间来设置线程数

Executors 静态工具类提供了多个方法创建多种类型的线程池,例如以下三种

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
//线程数量不定的线程池,corePoolSize为0则表示没有常驻的核心线程
//keepAliveTime为60秒,即线程空闲超过60秒就会被销毁
public static ExecutorService newCachedThreadPool() {
  return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                60L, TimeUnit.SECONDS,
                                new SynchronousQueue<Runnable>());
}
//固定大小的线程池,提交一个任务就创建一个工作线程,如果工作线程的数量达到指定的线程容量,则将任务放到任务队列等待,创建的线程即使空闲也不会释放
public static ExecutorService newFixedThreadPool(int nThreads) {
  return new ThreadPoolExecutor(nThreads, nThreads,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>());
}
//单线程线程池,任务入队,由单线程顺序执行,若当前线程异常了,则创建一个新的线程
public static ExecutorService newSingleThreadExecutor() {
  return new FinalizableDelegatedExecutorService
    (new ThreadPoolExecutor(1, 1,
                            0L, TimeUnit.MILLISECONDS,
                            new LinkedBlockingQueue<Runnable>()));
}

任务队列

主要实现类有以下四种

LinkedBlockingQueue,链表无界队列,容量为 Integer.MAX_VALUE,任务顺序为 FIFO 先进先出

DelayedWorkQueue,堆延迟队列,ScheduledThreadPoolExecutor 有使用该队列

PriorityBlockingQueue,具有优先级的阻塞队列

SynchronousQueue,不保存任务的队列,存一个取一个,串行化执行

开启线程

ThreadFactory 接口中定义了 newThread 方法来开启线程

1
2
3
public interface ThreadFactory {
    Thread newThread(Runnable r);
}

在创建线程池时直接将 ThreadFactory 接口的实例传入线程池的构造方法即可

工具类 Executors 提供了 defaultThreadFactory 方法返回默认的 ThreadFactory 接口的实现类

1
2
3
public static ThreadFactory defaultThreadFactory() {
  return new DefaultThreadFactory();
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
static class DefaultThreadFactory implements ThreadFactory {
  //...
  //接收Runnable任务
  public Thread newThread(Runnable r) {
    Thread t = new Thread(
      group, r, namePrefix + threadNumber.getAndIncrement(), 0);
    //非守护线程
    if (t.isDaemon())
      t.setDaemon(false);
    //优先级都为NORM_PRIORITY
    if (t.getPriority() != Thread.NORM_PRIORITY)
      t.setPriority(Thread.NORM_PRIORITY);
    return t;
  }
}

开启自定义线程,只需实现 ThreadFactory 接口并重写 newThread 方法即可

拒绝策略

RejectedExecutionHandler 接口定义了任务拒绝提交的处理方法 rejectedExecution

1
2
3
4
package java.util.concurrent;
public interface RejectedExecutionHandler {
  void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

在 ThreadPoolExecutor 类内部定义了四个实现类分别对应四种拒绝策略

AbortPolicy,默认策略,直接抛出异常

CallerRunsPolicy,由调用者所在的线程执行被丢弃的任务

DiscardOldestPolicy,丢弃最先进队即下一个将被执行的任务,然后重新提交当前任务

DiscardPolicy,丢弃任务不做处理

自定义拒绝策略,只需实现 RejectedExecutionHandler 接口并重写 rejectedExecution 方法即可

线程池状态

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
//ThreadPoolExecutor类
//ctl原子对象有32位,高3位表示线程池状态,低29位记录有效线程数
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
private static final int RUNNING    = -1 << COUNT_BITS; //高3位为111
private static final int SHUTDOWN   =  0 << COUNT_BITS; //高3位为000
private static final int STOP       =  1 << COUNT_BITS; //高3位为001
private static final int TIDYING    =  2 << COUNT_BITS; //高3位为010
private static final int TERMINATED =  3 << COUNT_BITS; //高3位为011
//状态转换
/**
 * RUNNING -> SHUTDOWN
 *    On invocation of shutdown(), perhaps implicitly in finalize()
 * (RUNNING or SHUTDOWN) -> STOP
 *    On invocation of shutdownNow()
 * SHUTDOWN -> TIDYING
 *    When both queue and pool are empty
 * STOP -> TIDYING
 *    When pool is empty
 * TIDYING -> TERMINATED
 *    When the terminated() hook method has completed
 *
 * Threads waiting in awaitTermination() will return when the
 * state reaches TERMINATED.
 */

shutdown 方法是不再接收新的任务提交,关闭空闲线程,正在执行任务的线程等执行完才关闭,而 shutdownNow 方法是直接关闭所有线程,

执行任务

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
//AbstractExecutorService类
//submit提交任务是将任务包装为RunnableFuture传递给execute方法
public Future<?> submit(Runnable task) {
  if (task == null) throw new NullPointerException();
  //newTaskFor方法创建对象是FutureTask
  RunnableFuture<Void> ftask = newTaskFor(task, null);
  execute(ftask);
  return ftask;
}
public <T> Future<T> submit(Callable<T> task) {
  if (task == null) throw new NullPointerException();
  RunnableFuture<T> ftask = newTaskFor(task);
  execute(ftask);
  return ftask;
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
//ThreadPoolExecutor类
public void execute(Runnable command) {
  if (command == null) throw new NullPointerException();
  //获取提交的任务数量和线程池状态信息
  int c = ctl.get();
  //情况一,若提交的任务数量小于corePoolSize,则开启一个线程并将任务添加到该线程中执行
  if (workerCountOf(c) < corePoolSize) {
    if (addWorker(command, true))
      return;
    c = ctl.get();
  }
  //情况二,若提交的任务数量大于corePoolSize,且任务可以排队,则将任务添加到任务队列
  if (isRunning(c) && workQueue.offer(command)) {
    //需要再次确认线程池状态,若线程池异常终止了,则删除任务,然后执行拒绝策略
    int recheck = ctl.get();
    if (! isRunning(recheck) && remove(command))
      reject(command);
    //若线程池正常且任务数量为0,则开启一个线程,新开的线程执行的任务为null
    else if (workerCountOf(recheck) == 0)
      addWorker(null, false);
  }
  //如果不是情况一、情况二,则新开线程执行任务,若失败则直接执行拒绝策略
  else if (!addWorker(command, false))
    reject(command);
}

线程池返回结果集

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class Test {
  public static void main(String[] args) {
    //创建线程池
    ExecutorService threadPool = Executors.newSingleThreadExecutor();
    //定义线程池线程执行返回的结果集
    ArrayList<Future<Integer>> results = new ArrayList<>();
    for (int i = 0; i < 10; i++) {
      //提交任务到线程池
      Future f = threadPool.submit(new MyCallable());
      //将Future回调添加到结果集
      results.add(f);
    }
    try {
      for (Future result : results)
        //调用Future回调的get方法获取执行结果
        System.out.print(result.get() + " ");
      //69 31 78 77 44 74 35 22 71 37
    } catch (Exception e) {
      e.printStackTrace();
    } finally {
      //关闭线程池
      threadPool.shutdown();
    }
  }
}
class MyCallable implements Callable<Integer> {
  @Override
  public Integer call() {
    return new Random().nextInt(100);
  }
}

消失的异常

通过 submit 提交的 Runnable 任务会被包装成 FutureTask,异常将被捕捉并保存,不会抛出

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class Test {
  public static void main(String[] args) {
    ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
      0, Integer.MAX_VALUE, 0L, TimeUnit.SECONDS,
      new SynchronousQueue<>());
    for (int i = 0; i < 5; i++) {
      threadPool.submit(new MyRunnable(100, i));
      //50 100 33 25 
    }
  }
}
class MyRunnable implements Runnable {
  int a, b;
  public MyRunnable(int a, int b) {
    this.a = a;
    this.b = b;
  }
  @Override
  public void run() {
    System.out.print(a / b + " ");
  }
}

100 / 0 本应该抛出异常的,但只有四个结果没有异常抛出,既然 submit 将任务封装为 FutureTask,那就调用 Future 回调的 get 方法来判断任务是否执行结束,并且如果有异常也会抛出

1
2
3
4
5
6
Future f = threadPool.submit(new MyRunnable(100, i));
try {
  f.get();
} catch (Exception e) {
  e.printStackTrace();
}

或者直接调用 execute 方法

1
threadPool.execute(new MyRunnable(100, i));

扩展线程池

可以通过重写 ThreadPoolExecutor 提供的三个 hook 方法 beforeExecute、afterExecute、terminated 来扩展线程池

1
2
3
protected void beforeExecute(Thread t, Runnable r) {}
protected void afterExecute(Runnable r, Throwable t) {}
protected void terminated() {}

以上内容是玉山整理的笔记,如有错误还请指出