admin 管理员组

文章数量: 887006

【并发编程】优雅使用线程池结合CompletableFuture实现异步编排

文章目录

    • 参考
    • 1、线程池引入
    • 2、Executors
      • 2.1、概述
      • 2.2、Executors缺陷
    • 3、优雅的创建线程池
      • 3.1、正确挑选方法
      • 3.2、线程池配置类
    • 4、线程池执行流程
    • 5、CompletableFuture
      • 5.1、概述
      • 5.2、核心三词
      • 5.4、单异步任务
        • 5.4.1、runAsync
        • 5.4.2、supplyAsync
      • 5.4、双异步任务
        • 5.4.1、thenRunAsync
        • 5.4.2、thenAcceptAsync
        • 5.4.3、thenApplyAsync
      • 5.5、多异步任务
        • 5.5.1、前置准备
        • 5.5.2、runAfterBothAsync
        • 5.5.3、thenAcceptBothAsync
        • 5.5.4、thenCombineAsync
        • 5.5.5、runAfterEitherAsync
        • 5.5.6、acceptEitherAsync
        • 5.5.7、applyToEitherAsync

参考

Java中线程池,你真的会用吗?

深入理解线程池及相关面试题

线程池创建之后,会立即创建核心线程吗

【Java多线程】CompletableFuture实现多线程异步编排

另外一种接收文件集合上传返回图片链接的上传方式:【OSS】SpringBoot搭配线程池整合阿里云OSS实现图片异步上传

1、线程池引入

所谓线程池,通俗来讲,就是一个管理线程的池子。它可以容纳多个线程,其中的线程可以反复利用,省去了频繁创建线程对象的操作。

在 Java 并发编程框架中的线程池是运用场景最多的技术,几乎所有需要异步或并发执行任务的程序都可以使用线程池。在开发过程中,合理地使用线程池能够带来至少以下4个好处:

  • 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗;

  • 提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行;

  • 提高线程的可管理性。线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控。

  • 提供更强大的功能,比如延时定时线程池;

2、Executors

2.1、概述


Executors 是一个Java中的工具类。提供工厂方法来创建不同类型的线程池。

核心概念:这四个线程池的本质都是ThreadPoolExecutor对象:

  • newFiexedThreadPool(int Threads): 创建固定数目线程的线程池。

  • newCachedThreadPool(): 创建一个可缓存的线程池,调用execute 将重用以前构造的线程(如果线程可用)。如果没有可用的线程,则创建一个新线程并添加到池中。终止并从缓存中移除那些已有 60 秒钟未被使用的线程。

  • newSingleThreadExecutor(): 创建一个单线程化的Executor。

  • newScheduledThreadPool(int corePoolSize): 创建一个支持定时及周期性的任务执行的线程池,多数情况下可用来替代Timer类。

2.2、Executors缺陷


在阿里巴巴Java开发手册中明确指出,不允许使用Executors创建线程池,这是因为使用Executors创建线程池可能会导致OOM(OutOfMemory ,内存溢出)。

3、优雅的创建线程池

3.1、正确挑选方法


避免使用Executors创建线程池,主要是避免使用其中的默认实现,那么我们可以自己直接调用ThreadPoolExecutor的构造函数来自己创建线程池。在创建的同时,给BlockQueue指定容量就可以了。

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler)

上面放出来的是ThreadPoolExecutor的全参构造函数,其中的参数分别为:

  • corePoolSize:线程池中的核心线程数。指定线程数回一直存在与线程池中,除非设置了 allowCoreThreadTimeOut参数。当创建完成之后就会准备好等待接收异步任务去执行;
  • maximumPoolSize:最大线程数。当请求的线程超过最大线程数时,将会扩充线程数量到最大线程数,但不会无限扩充,达到控制资源的效果;
  • keepAliveTime:非核心线程的存活时间。如果当前存活的线程数量大于核心线程数corePoolSize,则会释放空闲的线程直到线程数回到最大线程数corePoolSize
  • unit:keepAliveTime 参数的时间单位,如TimeUnit.SECONDS
  • workQueue:阻塞队列。用于保存多余的任务,如果任务很多,就会将多的任务存放进队列中,只要有空闲的线程就会去队列中取出新的任务执行直到队列为空;
  • threadFactory:线程池工厂,标识线程,即为线程起一个具有意义的名称,可自定义;
  • handler:拒绝策略。如果阻塞队列满了,就会按照我们指定的拒绝策略拒绝后续任务,默认为丢弃任务。

在线程创建过程中有一个细节,即创建阻塞队列时,队列默认的最大值为Integer的最大值,这很显然是不合理的,容易内存不够造成oom,因此一般都需要在创建时设定容量,如new LinkedBlockingDeque<>(1000)

3.2、线程池配置类


在开发过程中一般会将线程池的创建抽取成一个配置类,其中的各类参数则会配置在配置文件中去。

这里有个细节,就是创建线程池的时候并不会立马准备好corePoolSize数量的线程来准备接收任务,而是要等到有任务提交时才会启动。

这一部分在下面的4、线程池执行流程/线程池创建中也有提及,这里使用了prestartCoreThread方法在初始化线程池的时候开启一个核心线程,避免在执行异步操作的时候初始化核心线程耗时巨大(可自行尝试,在后面的例子中因为加上了这一方法,接口的耗时减少了50倍

@Configuration
public class ThreadPoolConfig {@Beanpublic ThreadPoolExecutor threadPoolExecutor(@Value("${thread.pool.coreSize}") Integer coreSize,@Value("${thread.pool.maxSize}") Integer maxSize,@Value("${thread.pool.keepalive}") Integer keepalive,@Value("${thread.pool.blockQueueSize}") Integer blockQueueSize) {ThreadPoolExecutor executor = new ThreadPoolExecutor(coreSize,maxSize,keepalive,TimeUnit.SECONDS,new ArrayBlockingQueue<>(blockQueueSize),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());executor.prestartCoreThread();return executor;}
}

4、线程池执行流程

当向线程池提交一个任务之后,线程池是如何处理这个任务的呢?下面就先来看一下它的主要处理流程。

下面详细介绍线程池的详细运行流程:

  1. 线程池创建,但是并不会立马准备好corePoolSize数量的线程来准备接收任务,线程并不会立即启动,而是要等到有任务提交时才会启动。除非调用了prestartCoreThread/prestartAllCoreThreads 事先启动核心线程:

    1.1. prestartCoreThread:Starts a core thread, causing it to idly wait for work. This overrides the default policy of starting core threads only when new t asks are executed;

    1.2. prestartAllCoreThreads:Starts all core threads。

  2. 任务到来,用准备好的corePoolSize个空闲线程执行:

    2.1. 核心线程数已满,就将再进来的任务放入阻塞队列中,期间如果运行中的线程小于核心线程数时,就会去阻塞队列中获取任务执行;

    2.2. 阻塞队列已满,就会创建新线程去执行阻塞队列中的任务,但最大只能创建到最大线程数maximumPoolSize

    2.3. 存活且运行的线程数达到最大线程数maximumPoolSize,即线程已满时,根据设定的拒绝策略handler来对后来任务进行相应处理;

    2.4. 当所有线程都执行完,在指定时间keepAliveTime之后,将会自动销毁线程,最终保持在corePoolSize大小。

  3. 在线程创建过程中,所有的线程都由指定的工厂threadFactory进行创建,并为线程设置标识,即起名。

线程池场景模拟:

一个线程池corePoolSize为7,maximumPoolSize为20,阻塞队列最大50,100并发进来怎么分配的?

答案:先有7个线程能够直接处理7个任务,接下来50个进入队列排队,再多开13个继续执行。此时所有线程池和阻塞队列都已满,但只有70个被安排上,剩下的30个走设定好的拒绝策略进行相对应操作。

最终以一张图来总结和概括下线程池的执行示意图:

5、CompletableFuture

5.1、概述


Java 8中, 新增加了一个包含50个方法左右的类: CompletableFuture,提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,提供了函数式编程的能力,可以通过回调的方式处理计算结果,并且提供了转换和组合CompletableFuture的方法。

CompletableFuture类实现了Future接口,所以你还是可以像以前一样通过get方法阻塞或者轮询的方式获得结果,但是这种方式不推荐使用。

CompletableFuture和FutureTask同属于Future接口的实现类,都可以获取线程的执行结果。

5.2、核心三词


CompletableFuture中API众多,其中对于线程串行化方法的可大致分为三种类型,而三种类型对应着三种不同的单词前缀:

  • run:不能接收上次任务的执行结果,也没有返回值;
  • accept:可以接收上次任务的执行结果,但没有返回值;
  • apply:可以接收上次任务的执行结果,也拥有返回值。

5.4、单异步任务


5.4.1、runAsync

runAsync为单个异步任务中其中之一的API,其没有返回值。

public void testRunAsync() {for (int i = 0; i < 5; i++) {System.err.println("第" + i + "个循环开始……");CompletableFuture.runAsync(() -> {System.out.println("当前线程" + Thread.currentThread().getId());int calc = 10 / 2;System.out.println("运行结果:" + calc);}, executor);System.err.println("第" + i + "个循环结束……");}
}

运行结果:

5.4.2、supplyAsync

supplyAsync相对于前面的runAsyns的方法,则是多了一个返回值,其可以结合以下两个方法进行使用:

  • whenComplete:能感知异常,能感知结果,但无返回值。当执行完成supplyAsync时会执行方法中的逻辑;
  • exceptionally:能感知异常,不能感知结果,有返回值。当执行supplyAsync时出现异常终端之后会先执行whenComplete方法再执行本方法,对异常进行处理;
  • handle:相当于整合了上面的两个方法,即可感知结果,也可处理异常。
public void testSupplyAsync() {CompletableFuture<Integer> exceptionally = CompletableFuture.supplyAsync(() -> {System.out.println("当前线程" + Thread.currentThread().getId());int calc = 10 / 0;System.out.println("运行结果:" + calc);return calc;}, executor).whenComplete((res, exception) -> {// 当执行完成之后获取任务执行结果和异常System.out.println("异步任务成功完成...结果是:" + res + ";异常是:" + exception);}).exceptionally(throwable -> {// 异常之后对结果进行处理return 10;});System.out.println(exceptionally.join());System.err.println("=====不严谨的分割线=====");CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {System.out.println("当前线程" + Thread.currentThread().getId());int i = 10 / 4;System.out.println("运行结果:" + i);return i;}, executor).handle((res, exception) -> {// 当执行完成之后获取任务执行结果和异常if (exception != null) {// 存在异常,处理结果return 0;} else {// 正常运行,返回正确处理后的结果return res;}});
}

运行结果:

5.4、双异步任务


5.4.1、thenRunAsync

thenRunAsync是CompletableFuture中线程串行化方法中的其中之一,其不能接收上一次的执行结果,也没返回值。

public void testThenRunAsync() {CompletableFuture.supplyAsync(() -> {System.out.println("当前线程" + Thread.currentThread().getId());int i = 10 / 4;System.out.println("运行结果:" + i);return i;}, executor).thenRunAsync(() -> {System.out.println("任务2启动了...");}, executor);
}

运行结果:

5.4.2、thenAcceptAsync

正如5.2中所说,accept与上方的run不同,是可以获取到返回值的,但是其本身则是没有返回值。

public void testThenAcceptAsync() {CompletableFuture.supplyAsync(() -> {System.out.println("当前线程" + Thread.currentThread().getId());int i = 10 / 4;System.out.println("运行结果:" + i);return i;}, executor).thenAcceptAsync(res -> {System.out.println("任务2启动了..." + res);}, executor);
}

运行结果:

5.4.3、thenApplyAsync

同样如5.2中所说,apply是拥有返回值,同样与前面的两个不同,其是存在返回值的。

public void test() {CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {System.out.println("当前线程" + Thread.currentThread().getId());int i = 10 / 4;System.out.println("运行结果:" + i);return i;}, executor).thenApplyAsync(res -> {System.out.println("任务2启动了..." + res);return "hello " + res;}, executor);System.out.println(future.join());
}

运行结果:

5.5、多异步任务


5.5.1、前置准备

因为是多异步任务的形式,因此在这里先准备好三个任务供下面的共用。

public CompletableFuture<Object> getFuture01() {return CompletableFuture.supplyAsync(() -> {System.out.println("任务1线程" + Thread.currentThread().getId());int i = 10 / 4;System.out.println("任务1结束:");return i;}, executor);
}
public CompletableFuture<Object> getFuture02() {return CompletableFuture.supplyAsync(() -> {System.out.println("任务2线程" + Thread.currentThread().getId());try {Thread.sleep(3000);System.out.println("任务2结束:");} catch (InterruptedException e) {e.printStackTrace();}return "hello";}, executor);
}public CompletableFuture<Object> getFuture03() {return CompletableFuture.supplyAsync(() -> {System.out.println("任务3线程" + Thread.currentThread().getId());System.out.println("任务3结束:");return "haha";}, executor);
}
5.5.2、runAfterBothAsync

runAfterBothAsync是针对于有两个前置任务的一个方法,其会等待任务01 任务02都完成了,再开始执行后续任务,不感知任务1、2的结果的,也没返回值。

public void testRunAfterBothAsync() {System.out.println("starting……");// 等待任务一和任务二完成之后再执行action即箭头函数中的任务CompletableFuture<Void> future = getFuture01().runAfterBothAsync(getFuture02(), () -> {System.out.println("任务一和任务二都完成了");}, executor);System.out.println("ending……");
}

运行结果:

5.5.3、thenAcceptBothAsync

thenAcceptBothAsync同样是针对有两个前置任务的一个方法,其会等待任务01 任务02都完成了,再开始执行任务3,能感知到任务1、2的结果,但没返回值。

public void testRun() {System.out.println("start……");CompletableFuture<Object> future01 = getFuture01();CompletableFuture<Object> future02 = getFuture02();// 等待任务一和任务二完成之后再执行action即箭头函数中的任务,且可以获取到任务一和任务二的返回结果future01.thenAcceptBothAsync(future02, (f1, f2) -> {System.out.println("任务一和任务二都完成了");System.out.println("任务一拿到结果:" + f1);System.out.println("任务二拿到结果:" + f2);}, executor);System.out.println("ending……");
}

运行结果:

5.5.4、thenCombineAsync

thenCombineAsync也是针对有两个前置任务的一个方法,其会等待任务01 任务02都完成了,再开始执行任务3,能感知到任务1、2的结果,而且自己可以带返回值。

public void testThenCombineAsync() {System.out.println("start……");CompletableFuture<Object> future01 = getFuture01();CompletableFuture<Object> future02 = getFuture02();// 等待任务一和任务二完成之后再执行action即箭头函数中的任务,且可以获取到任务一和任务二的返回结果并返回结果CompletableFuture<String> future = future01.thenCombineAsync(future02, (f1, f2) -> {System.out.println("任务一和任务二都完成了");return "任务一拿到结果:" + f1 + "任务二拿到结果:" + f2;}, executor);System.out.println("ending……");System.out.println(future.join());
}

运行结果:

5.5.5、runAfterEitherAsync

runAfterBothAsync不同的是,runAfterEitherAsync两个任务只要有一个完成,就执行任务3,不感知结果,自己没返回值。

public void testRunAfterEitherAsync() {System.out.println("starting……");// 等待任务一和任务二完成之后再执行action即箭头函数中的任务getFuture01().runAfterEitherAsync(getFuture02(), () -> {System.out.println("任务一和任务二有其中一个完成了");}, executor);System.out.println("ending……");
}

运行结果:

5.5.6、acceptEitherAsync

thenAcceptBothAsync不同的是,acceptEitherAsync两个任务只要有一个完成,就执行任务3,感知结果,自己没返回值。

public void testAcceptEitherAsync() {System.out.println("starting……");// 等待任务一和任务二完成之后再执行action即箭头函数中的任务getFuture01().acceptEitherAsync(getFuture03(), (res) -> {System.out.println("任务一和任务三有其中一个完成了");System.out.println(res);}, executor);System.out.println("ending……");
}

运行结果:

5.5.7、applyToEitherAsync

thenCombineAsync不同的是,applyToEitherAsync两个任务只要有一个完成,就执行任务3,感知结果,自己有返回值。

public void testApplyToEitherAsync() {System.out.println("starting……");// 等待任务一和任务二完成之后再执行action即箭头函数中的任务CompletableFuture<Object> future = getFuture01().applyToEitherAsync(getFuture03(), (res) -> {System.out.println("任务一和任务三有其中一个完成了");return res;}, executor);System.out.println("ending……");System.out.println("执行完成的结果为:" + future.join());
}

运行结果:

本文标签: 并发编程优雅使用线程池结合CompletableFuture实现异步编排