侧边栏壁纸
  • 累计撰写 93 篇文章
  • 累计创建 85 个标签
  • 累计收到 9 条评论

JUC异步编排之CompletableFuture浅谈

bearjun
2021-11-09 / 0 评论 / 0 点赞 / 374 阅读 / 6,899 字 / 正在检测是否收录...
温馨提示:
本文最后更新于 2021-11-09,若内容或图片失效,请留言反馈。部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

什么是JUC

简单的来说,JUC 就是指java.util.concurrent, JUC 包含许多线程安全、测试良好、高性能的并发构建块。毫不客气的说,创建JUC的目的就是要实现 Collection 框架对数据结构所执行的并发操作。通过提供一组可靠的、高性能并发构建块,从而提升应用程序并发类的线程安全性、可伸缩性、性能、可读性和可靠性。

Java8-在线API文档-官方版本
03307-ckntoilfeuj.png
简单的来说, JUC 就是指的就是上述包中的内容。

CompletableFuture 的基本用法

什么是异步?

所谓异步调用其实就是实现一个可无需等待被调用函数的返回值而让操作继续运行的方法。在 Java 语言中,简单的讲就是另启一个线程来完成调用中的部分计算,使调用继续运行或返回,而不需要等待计算结果。但调用者仍需要取线程的计算结果。

为什么要用 CompletableFuture ?

Java中已经引入了 Future 接口,用于描述一个异步计算的结果。虽然 Future 以及相关使用方法提供了异步执行任务的能力,但是对于结果的获取却是很不方便,只能通过阻塞或者轮询的方式得到任务的结果。
阻塞的方式显然和我们的异步编程的初衷相违背,轮询的方式又会耗费无谓的 CPU 资源,而且也不能及时地得到计算结果。

public static void main(String[] args) throws ExecutionException, InterruptedException {
		FutureTask f = new FutureTask(() -> {
			Thread.sleep(3000);
			return "我是阻塞...";
		});

		new Thread(f).start();

		while (true) {
			if (f.isDone()) {
				System.out.println(f.get());
				break;
			} else {
				Thread.sleep(1000);
				System.out.println("等待结果返回...");
			}
		}

		System.out.println("main thread is over ");

		// System.out.println(f.get());
	}

以上这段代码可以很清楚的知道,当 FutureTask 执行时发生阻塞。
当然的,我们开发中一般的,回把get放到最后。

CompletableFuture

通过源码可以看出,CompletableFuture 类实现了 Future 和 CompletionStage 两个接口。并且对 Future 进行了扩展。
28995-hjemugmgrok.png
再来看看 CompletionStage ,CompletionStage 代表异步计算中的一个阶段或步骤。CompletionStage 异步计算过程中的某一个阶段,一个阶段完成以后可能会触发另外一个阶段,有些类似 Linux 系统的管道分隔符传参数。但又像 StringBuffer ,操作之后返回了自己。但又很像 Optional 判断了对象之后对对象的某一个阶段的比较做不同的操作。
其中 CompletableFuture 最常用的方法有两个:

  • 执行类:supplyAsync 有返回值 / runAsync 无返回值
  • 获取结果类: join / get 获取返回值
public static void main(String[] args) throws ExecutionException, InterruptedException {
		// 创建线程池(可以自定义线程池)
		ExecutorService executorService = Executors.newCachedThreadPool();
		// 异步方法
		CompletableFuture<Void> voidCompletableFuture = CompletableFuture.runAsync(() -> {
			// TODO
		});
		// 异步方法带线程池
		CompletableFuture<Void> voidCompletableFuture1 = CompletableFuture.runAsync(() -> {
			// TODO
		}, executorService);

		// 带返回值的异步方法
		CompletableFuture<String> stringCompletableFuture = CompletableFuture.supplyAsync(() -> {
			// TODO
			return "带返回值的异步方法";
		});
		// 带返回值并且带线程池的异步方法
		CompletableFuture<String> stringCompletableFuture1 = CompletableFuture.supplyAsync(() -> {
			// TODO
			return "带返回值并且带线程池的异步方法";
		}, executorService);

		System.out.println(voidCompletableFuture.get()); // 结果为:null
		System.out.println(voidCompletableFuture1.get());// 结果为:null
		System.out.println(stringCompletableFuture.get());// 结果为:带返回值的异步方法
		System.out.println(stringCompletableFuture1.join());// 结果为:带返回值并且带线程池的异步方法
	}

这个只是 CompletableFuture 最常用的方法。我们可以看出,supplyAsync 的两个方法都带有返回值,而且返回值类型和返回值的类型一样。join 和 get 都是获取异步编排之后的结果。最大的区别就是: join 没有异常抛出,而 get 有。接下来,我们来看看 CompletableFuture 其他的操作:

** 1、获得结果和触发计算 **

public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
		// 创建线程池(可以自定义线程池)
		ExecutorService executorService = Executors.newCachedThreadPool();

		// 带返回值并且带线程池的异步方法
		CompletableFuture<String> stringCompletableFuture = CompletableFuture.supplyAsync(() -> {
			try {
				Thread.sleep(2000);
				// TODO
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			return "带返回值并且带线程池的异步方法";
		}, executorService);

		/**
		 * 获取结果,但是会有异常抛出或者需要处理
		 */
		System.out.println(stringCompletableFuture.get());

		/**
		 * 获取结果,没有异常需要抛出或者处理
		 */
		System.out.println(stringCompletableFuture.join());

		/**
		 * 设置超时时间,如果在规定时间内没有获得结果,抛出异常:java.util.concurrent.TimeoutException
		 */
		System.out.println(stringCompletableFuture.get(1, TimeUnit.SECONDS));

		/**
		 * 没有计算完成的情况下,会返回定义的这个替代结果
		 * 计算完成,返回计算完成后的结果
		 */
		try {
			Thread.sleep(3000);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		System.out.println(stringCompletableFuture.getNow("没有结果"));

		/**
		 * 是否打断future的get操作
		 * 如果打断get操作成功,complete = true ,则最终get的值为complete给的值。
		 * 如果打算get失败,complete = false ,则最终的结果就是get的值。
		 * 例如:当前的CompletableFutures计算为2s
		 * 1、如果当前直接打算操作,之后再get取值,complete = true get的值为complete定义的值
		 * 2、如果当前操作之前线程等待3s,CompletableFutures已近完成了操作,则打断失败,complete = false get的值就是计算的值
		 */
		System.out.println(stringCompletableFuture.complete("没有结果"));
		System.out.println(stringCompletableFuture.get());
	}

** 2、对计算结果进行处理 **

public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
		// 创建线程池(可以自定义线程池)
		ExecutorService executorService = Executors.newCachedThreadPool();

		/**
		 * thenApply,handle都是对程序的下一步一步的执行,但是最大的区别在于:
		 * handle传递两个参数,并且抛出异常时,程序继续往下执行。
		 * thenApply只传递一个参数,有异常时,程序中断,直接到exceptionally
		 * 都存在xxxAsync,也是一个带线程池,一个不带。当不带Async时,由当先线程处理,带了之后,则去线程池中获取一个新的线程执行。
		 *
		 * exceptionally -> try/catch
		 * handle + whenComplete -> try/finally
		 */
		CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
			return 1;
		}, executorService).thenApply((t) -> {
			return t + 2;
		}).thenApplyAsync(t -> {
			// int a = 10/0;
			return t + 3;
		}, executorService).handle((re, ex) -> {
			// int a = 10/0;
			return re + 4;
		}).handleAsync((re, ex) -> {
			return re + 5;
		}, executorService).whenComplete((result, e) -> {
			System.out.println("result=" + result);
		}).exceptionally(e -> {
			e.printStackTrace();
			return null;
		});

		System.out.println(future.join());
	}

** 3、对计算结果进行消费 **

public static void main(String[] args) {
		// 创建线程池(可以自定义线程池)
		ExecutorService executorService = Executors.newCachedThreadPool();

		/**
		 * thenAccept 接收任务的处理结果,并消费处理,无返回结果
		 * 如下thenAccept只对接受到的结果进行消费
		 */
		CompletableFuture.supplyAsync(() -> {
			return 1;
		}, executorService).thenApply(r -> {
			return r + 2;
		}).thenAccept(r -> {
			// TODO r
		});

		/**
		 * thenRun、thenApply、thenAccept对比:
		 * thenRun 没有接受值,也没有返回值,只是执行操作 。下面代码打印:null
		 * thenApply 可以接受上一步的结果进行操作,并且返回,下面代码打印:2
		 * thenAccept 对上一步的结果进行消费,只能接受操作,并没有返回值,下面代码打印:null
		 */
		System.out.println(CompletableFuture.supplyAsync(() -> {
			return 1;
		}).thenRun(() -> {

		}).join());

		System.out.println(CompletableFuture.supplyAsync(() -> {
			return 1;
		}).thenApply(r -> {
			return r + 1;
		}).join());

		System.out.println(CompletableFuture.supplyAsync(() -> {
			return 1;
		}).thenAccept(r -> {

		}).join());
	}

** 4、对计算速度进行选用 **

    public static void main(String[] args) {

        /**
         * applyToEither 比较两个CompletableFuture返回的结果,那个执行的快,返回执行快的结果
         * 例如:我们经常玩的暴力摩托的游戏,谁先到终点谁获胜
         */
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "我比较慢";
        }).applyToEither(CompletableFuture.supplyAsync(() -> {
            return "我比较快!";
        }), res -> {
            return res;
        });

        // 最后输出的结果:我比较快
        System.out.println(future.join());
    }

** 5、对计算结果进行合并 **

public static void main(String[] args) {

        /**
         * thenCombine 等待所有的CompletableFuture完成后,返回每个结果做处理
         *
         */
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return 10;
        }).thenCombine(CompletableFuture.supplyAsync(() -> {
            return 20;
        }), (res1, res2) -> {
            return res1 + res2;
        }).thenCombine(CompletableFuture.supplyAsync(()->{
            return 30;
        }),(res1,res2)->{
            /**
             * 注意这个地方的res1是上面等待完成返回结果计算之后的值
             */
            return res1 + res2;
        });

        // 等待1s后输出的结果:60
        System.out.println(future.join());
    }

至此,关于 CompletableFuture 的一些常用的用法都讲解完了。由于 CompletableFuture 里面的方法过于众多。希望以后多多看看官方的API。多多了解 CompletableFuture 特性带来的应用程序的提升。

项目实战应用建议

在今年年初的时候,自从我发现了这个好用的 CompletableFuture 以来,我就在我们公司的项目开始了实战,最开始确实搜搜百度,看看帖子,模仿模仿。现在,我已经可以得心应手了。下面是我的一些实战建议:

  • CompletableFuture 一定要搭配自己的定义的自定义线程池使用
  • 由于统计或者需要返回给前端多个数据的可以运用 CompletableFuture 把多个接口合并成一个
  • 数据的导入导出可以使用 CompletableFuture 来做提升效率
  • 涉及复杂流程的多方面调用,可以使用 CompletableFuture 来提升代码速度

好了,说到这。相信大家已近对 CompletableFuture 有一定的了解。看完了不代表自己就回了,一定要学以致用,尽快的应用到自己的项目。

0

评论区