admin 管理员组文章数量: 887006
【并发编程】优雅使用线程池结合CompletableFuture实现异步编排
文章目录
- 参考
- 1、线程池引入
- 2、Executors
- 2.1、概述
- 2.2、Executors缺陷
- 3、优雅的创建线程池
- 3.1、正确挑选方法
- 3.2、线程池配置类
- 4、线程池执行流程
- 5、CompletableFuture
- 5.1、概述
- 5.2、核心三词
- 5.4、单异步任务
- 5.4.1、runAsync
- 5.4.2、supplyAsync
- 5.4、双异步任务
- 5.4.1、thenRunAsync
- 5.4.2、thenAcceptAsync
- 5.4.3、thenApplyAsync
- 5.5、多异步任务
- 5.5.1、前置准备
- 5.5.2、runAfterBothAsync
- 5.5.3、thenAcceptBothAsync
- 5.5.4、thenCombineAsync
- 5.5.5、runAfterEitherAsync
- 5.5.6、acceptEitherAsync
- 5.5.7、applyToEitherAsync
参考
Java中线程池,你真的会用吗?
深入理解线程池及相关面试题
线程池创建之后,会立即创建核心线程吗
【Java多线程】CompletableFuture实现多线程异步编排
另外一种接收文件集合上传返回图片链接的上传方式:【OSS】SpringBoot搭配线程池整合阿里云OSS实现图片异步上传
1、线程池引入
所谓线程池,通俗来讲,就是一个管理线程的池子。它可以容纳多个线程,其中的线程可以反复利用,省去了频繁创建线程对象的操作。
在 Java 并发编程框架中的线程池是运用场景最多的技术,几乎所有需要异步或并发执行任务的程序都可以使用线程池。在开发过程中,合理地使用线程池能够带来至少以下4个好处:
-
降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗;
-
提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行;
-
提高线程的可管理性。线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控。
-
提供更强大的功能,比如延时定时线程池;
2、Executors
2.1、概述
Executors 是一个Java中的工具类。提供工厂方法来创建不同类型的线程池。
核心概念:这四个线程池的本质都是ThreadPoolExecutor对象:
-
newFiexedThreadPool(int Threads)
: 创建固定数目线程的线程池。 -
newCachedThreadPool()
: 创建一个可缓存的线程池,调用execute 将重用以前构造的线程(如果线程可用)。如果没有可用的线程,则创建一个新线程并添加到池中。终止并从缓存中移除那些已有 60 秒钟未被使用的线程。 -
newSingleThreadExecutor()
: 创建一个单线程化的Executor。 -
newScheduledThreadPool(int corePoolSize)
: 创建一个支持定时及周期性的任务执行的线程池,多数情况下可用来替代Timer类。
2.2、Executors缺陷
在阿里巴巴Java开发手册中明确指出,不允许使用Executors创建线程池,这是因为使用Executors创建线程池可能会导致OOM(OutOfMemory ,内存溢出)。
3、优雅的创建线程池
3.1、正确挑选方法
避免使用Executors创建线程池,主要是避免使用其中的默认实现,那么我们可以自己直接调用ThreadPoolExecutor
的构造函数来自己创建线程池。在创建的同时,给BlockQueue
指定容量就可以了。
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler)
上面放出来的是ThreadPoolExecutor的全参构造函数,其中的参数分别为:
corePoolSize
:线程池中的核心线程数。指定线程数回一直存在与线程池中,除非设置了 allowCoreThreadTimeOut参数。当创建完成之后就会准备好等待接收异步任务去执行;maximumPoolSize
:最大线程数。当请求的线程超过最大线程数时,将会扩充线程数量到最大线程数,但不会无限扩充,达到控制资源的效果;keepAliveTime
:非核心线程的存活时间。如果当前存活的线程数量大于核心线程数corePoolSize,则会释放空闲的线程直到线程数回到最大线程数corePoolSize;unit
:keepAliveTime 参数的时间单位,如TimeUnit.SECONDS;workQueue
:阻塞队列。用于保存多余的任务,如果任务很多,就会将多的任务存放进队列中,只要有空闲的线程就会去队列中取出新的任务执行直到队列为空;threadFactory
:线程池工厂,标识线程,即为线程起一个具有意义的名称,可自定义;handler
:拒绝策略。如果阻塞队列满了,就会按照我们指定的拒绝策略拒绝后续任务,默认为丢弃任务。
在线程创建过程中有一个细节,即创建阻塞队列时,队列默认的最大值为Integer的最大值,这很显然是不合理的,容易内存不够造成oom,因此一般都需要在创建时设定容量,如
new LinkedBlockingDeque<>(1000)
3.2、线程池配置类
在开发过程中一般会将线程池的创建抽取成一个配置类,其中的各类参数则会配置在配置文件中去。
这里有个细节,就是创建线程池的时候并不会立马准备好corePoolSize
数量的线程来准备接收任务,而是要等到有任务提交时才会启动。
这一部分在下面的4、线程池执行流程/线程池创建中也有提及,这里使用了prestartCoreThread
方法在初始化线程池的时候开启一个核心线程,避免在执行异步操作的时候初始化核心线程耗时巨大(可自行尝试,在后面的例子中因为加上了这一方法,接口的耗时减少了50倍)
@Configuration
public class ThreadPoolConfig {@Beanpublic ThreadPoolExecutor threadPoolExecutor(@Value("${thread.pool.coreSize}") Integer coreSize,@Value("${thread.pool.maxSize}") Integer maxSize,@Value("${thread.pool.keepalive}") Integer keepalive,@Value("${thread.pool.blockQueueSize}") Integer blockQueueSize) {ThreadPoolExecutor executor = new ThreadPoolExecutor(coreSize,maxSize,keepalive,TimeUnit.SECONDS,new ArrayBlockingQueue<>(blockQueueSize),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());executor.prestartCoreThread();return executor;}
}
4、线程池执行流程
当向线程池提交一个任务之后,线程池是如何处理这个任务的呢?下面就先来看一下它的主要处理流程。
下面详细介绍线程池的详细运行流程:
-
线程池创建,但是并不会立马准备好
corePoolSize
数量的线程来准备接收任务,线程并不会立即启动,而是要等到有任务提交时才会启动。除非调用了prestartCoreThread/prestartAllCoreThreads
事先启动核心线程:1.1. prestartCoreThread:Starts a core thread, causing it to idly wait for work. This overrides the default policy of starting core threads only when new t asks are executed;
1.2. prestartAllCoreThreads:Starts all core threads。
-
任务到来,用准备好的
corePoolSize
个空闲线程执行:2.1. 核心线程数已满,就将再进来的任务放入阻塞队列中,期间如果运行中的线程小于核心线程数时,就会去阻塞队列中获取任务执行;
2.2. 阻塞队列已满,就会创建新线程去执行阻塞队列中的任务,但最大只能创建到最大线程数
maximumPoolSize
;2.3. 存活且运行的线程数达到最大线程数
maximumPoolSize
,即线程已满时,根据设定的拒绝策略handler
来对后来任务进行相应处理;2.4. 当所有线程都执行完,在指定时间
keepAliveTime
之后,将会自动销毁线程,最终保持在corePoolSize
大小。 -
在线程创建过程中,所有的线程都由指定的工厂
threadFactory
进行创建,并为线程设置标识,即起名。
线程池场景模拟:
一个线程池corePoolSize为7,maximumPoolSize为20,阻塞队列最大50,100并发进来怎么分配的?
答案:先有7个线程能够直接处理7个任务,接下来50个进入队列排队,再多开13个继续执行。此时所有线程池和阻塞队列都已满,但只有70个被安排上,剩下的30个走设定好的拒绝策略进行相对应操作。
最终以一张图来总结和概括下线程池的执行示意图:
5、CompletableFuture
5.1、概述
在Java 8中, 新增加了一个包含50个方法左右的类: CompletableFuture,提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,提供了函数式编程的能力,可以通过回调的方式处理计算结果,并且提供了转换和组合CompletableFuture的方法。
CompletableFuture类实现了Future接口,所以你还是可以像以前一样通过get方法阻塞或者轮询的方式获得结果,但是这种方式不推荐使用。
CompletableFuture和FutureTask同属于Future接口的实现类,都可以获取线程的执行结果。
5.2、核心三词
在CompletableFuture中API众多,其中对于线程串行化方法的可大致分为三种类型,而三种类型对应着三种不同的单词前缀:
- run:不能接收上次任务的执行结果,也没有返回值;
- accept:可以接收上次任务的执行结果,但没有返回值;
- apply:可以接收上次任务的执行结果,也拥有返回值。
5.4、单异步任务
5.4.1、runAsync
runAsync为单个异步任务中其中之一的API,其没有返回值。
public void testRunAsync() {for (int i = 0; i < 5; i++) {System.err.println("第" + i + "个循环开始……");CompletableFuture.runAsync(() -> {System.out.println("当前线程" + Thread.currentThread().getId());int calc = 10 / 2;System.out.println("运行结果:" + calc);}, executor);System.err.println("第" + i + "个循环结束……");}
}
运行结果:
5.4.2、supplyAsync
supplyAsync相对于前面的runAsyns的方法,则是多了一个返回值,其可以结合以下两个方法进行使用:
- whenComplete:能感知异常,能感知结果,但无返回值。当执行完成supplyAsync时会执行方法中的逻辑;
- exceptionally:能感知异常,不能感知结果,有返回值。当执行supplyAsync时出现异常终端之后会先执行whenComplete方法再执行本方法,对异常进行处理;
- handle:相当于整合了上面的两个方法,即可感知结果,也可处理异常。
public void testSupplyAsync() {CompletableFuture<Integer> exceptionally = CompletableFuture.supplyAsync(() -> {System.out.println("当前线程" + Thread.currentThread().getId());int calc = 10 / 0;System.out.println("运行结果:" + calc);return calc;}, executor).whenComplete((res, exception) -> {// 当执行完成之后获取任务执行结果和异常System.out.println("异步任务成功完成...结果是:" + res + ";异常是:" + exception);}).exceptionally(throwable -> {// 异常之后对结果进行处理return 10;});System.out.println(exceptionally.join());System.err.println("=====不严谨的分割线=====");CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {System.out.println("当前线程" + Thread.currentThread().getId());int i = 10 / 4;System.out.println("运行结果:" + i);return i;}, executor).handle((res, exception) -> {// 当执行完成之后获取任务执行结果和异常if (exception != null) {// 存在异常,处理结果return 0;} else {// 正常运行,返回正确处理后的结果return res;}});
}
运行结果:
5.4、双异步任务
5.4.1、thenRunAsync
thenRunAsync是CompletableFuture中线程串行化方法中的其中之一,其不能接收上一次的执行结果,也没返回值。
public void testThenRunAsync() {CompletableFuture.supplyAsync(() -> {System.out.println("当前线程" + Thread.currentThread().getId());int i = 10 / 4;System.out.println("运行结果:" + i);return i;}, executor).thenRunAsync(() -> {System.out.println("任务2启动了...");}, executor);
}
运行结果:
5.4.2、thenAcceptAsync
正如5.2中所说,accept与上方的run不同,是可以获取到返回值的,但是其本身则是没有返回值。
public void testThenAcceptAsync() {CompletableFuture.supplyAsync(() -> {System.out.println("当前线程" + Thread.currentThread().getId());int i = 10 / 4;System.out.println("运行结果:" + i);return i;}, executor).thenAcceptAsync(res -> {System.out.println("任务2启动了..." + res);}, executor);
}
运行结果:
5.4.3、thenApplyAsync
同样如5.2中所说,apply是拥有返回值,同样与前面的两个不同,其是存在返回值的。
public void test() {CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {System.out.println("当前线程" + Thread.currentThread().getId());int i = 10 / 4;System.out.println("运行结果:" + i);return i;}, executor).thenApplyAsync(res -> {System.out.println("任务2启动了..." + res);return "hello " + res;}, executor);System.out.println(future.join());
}
运行结果:
5.5、多异步任务
5.5.1、前置准备
因为是多异步任务的形式,因此在这里先准备好三个任务供下面的共用。
public CompletableFuture<Object> getFuture01() {return CompletableFuture.supplyAsync(() -> {System.out.println("任务1线程" + Thread.currentThread().getId());int i = 10 / 4;System.out.println("任务1结束:");return i;}, executor);
}
public CompletableFuture<Object> getFuture02() {return CompletableFuture.supplyAsync(() -> {System.out.println("任务2线程" + Thread.currentThread().getId());try {Thread.sleep(3000);System.out.println("任务2结束:");} catch (InterruptedException e) {e.printStackTrace();}return "hello";}, executor);
}public CompletableFuture<Object> getFuture03() {return CompletableFuture.supplyAsync(() -> {System.out.println("任务3线程" + Thread.currentThread().getId());System.out.println("任务3结束:");return "haha";}, executor);
}
5.5.2、runAfterBothAsync
runAfterBothAsync是针对于有两个前置任务的一个方法,其会等待任务01 任务02都完成了,再开始执行后续任务,不感知任务1、2的结果的,也没返回值。
public void testRunAfterBothAsync() {System.out.println("starting……");// 等待任务一和任务二完成之后再执行action即箭头函数中的任务CompletableFuture<Void> future = getFuture01().runAfterBothAsync(getFuture02(), () -> {System.out.println("任务一和任务二都完成了");}, executor);System.out.println("ending……");
}
运行结果:
5.5.3、thenAcceptBothAsync
thenAcceptBothAsync同样是针对有两个前置任务的一个方法,其会等待任务01 任务02都完成了,再开始执行任务3,能感知到任务1、2的结果,但没返回值。
public void testRun() {System.out.println("start……");CompletableFuture<Object> future01 = getFuture01();CompletableFuture<Object> future02 = getFuture02();// 等待任务一和任务二完成之后再执行action即箭头函数中的任务,且可以获取到任务一和任务二的返回结果future01.thenAcceptBothAsync(future02, (f1, f2) -> {System.out.println("任务一和任务二都完成了");System.out.println("任务一拿到结果:" + f1);System.out.println("任务二拿到结果:" + f2);}, executor);System.out.println("ending……");
}
运行结果:
5.5.4、thenCombineAsync
thenCombineAsync也是针对有两个前置任务的一个方法,其会等待任务01 任务02都完成了,再开始执行任务3,能感知到任务1、2的结果,而且自己可以带返回值。
public void testThenCombineAsync() {System.out.println("start……");CompletableFuture<Object> future01 = getFuture01();CompletableFuture<Object> future02 = getFuture02();// 等待任务一和任务二完成之后再执行action即箭头函数中的任务,且可以获取到任务一和任务二的返回结果并返回结果CompletableFuture<String> future = future01.thenCombineAsync(future02, (f1, f2) -> {System.out.println("任务一和任务二都完成了");return "任务一拿到结果:" + f1 + "任务二拿到结果:" + f2;}, executor);System.out.println("ending……");System.out.println(future.join());
}
运行结果:
5.5.5、runAfterEitherAsync
与runAfterBothAsync不同的是,runAfterEitherAsync两个任务只要有一个完成,就执行任务3,不感知结果,自己没返回值。
public void testRunAfterEitherAsync() {System.out.println("starting……");// 等待任务一和任务二完成之后再执行action即箭头函数中的任务getFuture01().runAfterEitherAsync(getFuture02(), () -> {System.out.println("任务一和任务二有其中一个完成了");}, executor);System.out.println("ending……");
}
运行结果:
5.5.6、acceptEitherAsync
与thenAcceptBothAsync不同的是,acceptEitherAsync两个任务只要有一个完成,就执行任务3,感知结果,自己没返回值。
public void testAcceptEitherAsync() {System.out.println("starting……");// 等待任务一和任务二完成之后再执行action即箭头函数中的任务getFuture01().acceptEitherAsync(getFuture03(), (res) -> {System.out.println("任务一和任务三有其中一个完成了");System.out.println(res);}, executor);System.out.println("ending……");
}
运行结果:
5.5.7、applyToEitherAsync
与thenCombineAsync不同的是,applyToEitherAsync两个任务只要有一个完成,就执行任务3,感知结果,自己有返回值。
public void testApplyToEitherAsync() {System.out.println("starting……");// 等待任务一和任务二完成之后再执行action即箭头函数中的任务CompletableFuture<Object> future = getFuture01().applyToEitherAsync(getFuture03(), (res) -> {System.out.println("任务一和任务三有其中一个完成了");return res;}, executor);System.out.println("ending……");System.out.println("执行完成的结果为:" + future.join());
}
运行结果:
本文标签: 并发编程优雅使用线程池结合CompletableFuture实现异步编排
版权声明:本文标题:【并发编程】优雅使用线程池结合CompletableFuture实现异步编排 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.freenas.com.cn/jishu/1732355450h1534303.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论