1. 前言
在实际做项目中,我们经常使用多线程、异步的来帮我们做一些事情。
比如用户抽取奖品,异步的给他发一个push。
又比如一段前后不相关的业务逻辑,原本是顺序执行,耗时=(A + B + C),现在使用多线程加快执行速度,耗时=Max(A, B, C)。
这时候很多时候为了方便,我们就直接使用CompletableFuture来处理,但它真的好多坑,让我们一一细说。
2. CompletableFuture原理
2.1 CompletableFuture API
在CompletableFuture中提交任务有以下几种方式- public static CompletableFuture<Void> runAsync(Runnable runnable)
- public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
- public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
- public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
复制代码 这四个方法都是用来提交任务的,不同的是supplyAsync提交的任务有返回值,runAsync提交的任务没有返回值。两个接口都有一个重载的方法,第二个入参为指定的线程池,如果不指定,则默认使用ForkJoinPool.commonPool()线程池。
2.2 ForkJoinPool
Fork/Join 框架是一种并行计算框架,设计目的是提高具有递归性质任务的执行速度。典型的任务是将问题逐步分解成较小的任务,直到每一个子任务足够简单可以直接解决,然后再将结果聚合起来。
Fork/Join 框架基于"工作窃取"算法 (Work Stealing Algorithm),该算法的核心思想是每个工作线程有自己的任务队列(双端队列, Deque)。当一个线程完成了自己队列中的任务时,便会窃取其他线程队列中的任务执行,这样就不会因为某个线程在等待而浪费 CPU 资源。
Fork/Join 框架非常适合以下这些工作负载:
- 递归任务:如斐波那契数列、归并排序等分治算法。
- 大规模数据处理:快速对集合、数组等进行并行操作。
- 图像处理:图像处理等数据量大的任务可以被分成多个小任务并行处理。
也就是说ForkJoinPool比较适用于CPU密集型,而不太适合于IO密集型。但是我们业务中大多数都是IO密集型,比如等待数据库的返回,等待下游RPC的返回,等待子方法的返回等等
2.3 ForkJoinPool在CompletableFuture中的应用
先说结论:
- 如果你在使用CompletableFuture没有指定线程池,就会使用默认的ForkJoinPool
- CompletableFuture是否使用默认线程池的依据,和机器的CPU核心数有关。当CPU核心数-1大于1时,才会使用默认的线程池,否则将会为每个CompletableFuture的任务创建一个新线程去执行。
- 如果你的CPU核心数为4核,那么也就是最多也只有3个核心线程(3个线程,你确定够用?)
3. CompletableFuture坑
3.1 ForkJoinPool线程不够用,处于等待状态
小明为了加快代码的运行,将原来的A+B+C的运行逻辑,改成了(A,B,C)的运行逻辑,使用了3个CompletableFuture来执行,耗时从原本的900ms,缩短到了300ms,简单代码如下:- public void test1() {
- a();
- b();
- c();
- }
- public void test2() {
- CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> a());
- CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(() -> b());
- CompletableFuture<Integer> f3 = CompletableFuture.supplyAsync(() -> c());
- }
复制代码 上线后,小明心情良好,坐等升职加薪,没想到第二天却遇到了线上告警,接口频繁的超时,比之前还要慢,有些达到了10s+,小明实在想不明白。
后来排查发现,在项目中有大量使用到CompletableFuture.supplyAsync的地方,而每台机器8核,也就是7个线程,根本不够用,因此大量都是在等待线程中度过,因此耗时越来越严重,最终形成了雪崩,接口直接无限超时。
回答:使用CompletableFuture必须做到线程池隔离,不能使用默认的ForkJoinPool线程池
3.2 CompletableFuture反而更慢了?
小明经过这次事件学聪明了,使用CompletableFuture都自己写一个线程池。过了几天线上又告警出来了,大量接口超时,小明又蒙逼了,小明的代码如下:- ExecutorService es = Executors.newFixedThreadPool(5);
- public void test1() {
- CompletableFuture.runAsync(a(1), es);
- CompletableFuture.runAsync(b(1), es);
- CompletableFuture.runAsync(c(1), es);
- }
复制代码 后来排查发现,springmvc tomcat默认线程池是200,而你的线程池只有5个,也就是说,当接口请求了攀升。
比如现在有200个请求过来,执行到test1的时候,如果不使用线程池,反而没任何问题。但是使用到了线程池,5个线程池根本不够用,等待线程的释放,那么会越来越慢,最终拖垮整个服务。
3.3 CompletableFuture死锁?
小明说我再也不使用CompletableFuture了,小明说我直接调大线程池到200,那肯定没问题了,读者们思考下是否可行。答案是绝对不可行的,核心线程设置那么大,对cpu消耗非常严重,一定要设置合理的范围内。
再来看一个死锁问题,终于不是小明的锅了,这次轮到了小红,以下是死锁的代码:- ExecutorService es = Executors.newFixedThreadPool(5);
- public void test() {
- for (int i = 0; i < 5; i++) {
- CompletableFuture.runAsync(() -> a(), es);
- }
- }
- public void a() {
- CompletableFuture<Integer> f = CompletableFuture.supplyAsync(() -> 1, es);
- try {
- f.get();
- } catch (Exception e) {}
- }
复制代码 这里不卖关子了。由于是5个线程,在test方法中,将5个线程全部使用,然后test调用子方法a的时候。由于共用同一个线程池es,a方法永远获取不到线程池,a方法永远不可能执行成功,那么test方法也永远执行不了成功,那么就会处于永远阻塞死锁的这么一个线程。
因此解决办法就是,不同的业务尽量不要使用同一个线程池,为自己业务定制自己的线程池,而不是为了方便,共用一个commonPool。
4. 最后
通过以上对CompletableFuture的分析,以及一些实际踩坑的案例,相信你对CompletableFuture用法更加的了解了。
最后还是想说明一点,在业务代码中,能不使用多线程就不使用多线程,因为它带来的副作用远远比带来的好处要多的多得多,除非你非常清楚其中的原理。
CompletableFuture你真的懂了么?欢迎评论区留言讨论。
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |