博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
CompletableFuture 的同步与异步
阅读量:6811 次
发布时间:2019-06-26

本文共 9486 字,大约阅读时间需要 31 分钟。

hot3.png

CompletableFuture 类声明了 CompletionStage 接口,CompletionStage 接口实际上提供了同步或异步运行计算的舞台。

所谓异步调用其实就是实现一个可无需等待被调用函数的返回值而让操作继续运行的方法。在 Java 语言中,简单的讲就是另启一个线程来完成调用中的部分计算,使调用继续运行或返回,而不需要等待计算结果。但调用者仍需要获取线程的计算结果。

CompletableFuture 简单的异步运算场景

CompletableFuture 提供了如下的异步方法,

public static  CompletableFuture supplyAsync(Supplier supplier) {        return asyncSupplyStage(asyncPool, supplier);    }public static  CompletableFuture supplyAsync(Supplier supplier,Executor executor) { return asyncSupplyStage(screenExecutor(executor), supplier);}public static CompletableFuture
runAsync(Runnable runnable) { return asyncRunStage(asyncPool, runnable);}public static CompletableFuture
runAsync(Runnable runnable, Executor executor) { return asyncRunStage(screenExecutor(executor), runnable);}

supplyAsync 返回带有任务结果的CompletableFuture,而runAsync返回CompletableFuture<Void>。

Executor参数可以手动指定线程池,否则默认ForkJoinPool.commonPool()系统级公共线程池。

注意:ForkJoinPool.commonPool() 是 Daemon Thread(守护线程) 

只要当前JVM实例中尚存在任何一个非守护线程(用户线程)没有结束,守护线程就全部工作;

只有当用户线程结束时,JVM推出,守护线程随着JVM一同结束工作。

@Testpublic void test() throws InterruptedException {    CompletableFuture
cf = CompletableFuture.runAsync(() -> { System.out.println("runAsync=" + Thread.currentThread().getName() + "|" + Thread.currentThread().isDaemon()); try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } }); System.out.println("done=" + cf.isDone()); TimeUnit.SECONDS.sleep(4); System.out.println("done=" + cf.isDone());}

输出,

done=falserunAsync=ForkJoinPool.commonPool-worker-1|truedone=true

在这段代码中,runAsync 是异步执行的 ,通过 Thread.currentThread().isDaemon() 打印的结果就可以知道是Daemon线程异步执行的。

 

CompletableFuture 同步执行示例

CompletableFuture中不带Async的同步方法如下,

public  CompletionStage thenApply(Function
fn);public CompletableFuture
thenAccept(Consumer
action);public
CompletableFuture
thenAcceptBoth(CompletionStage
other, BiConsumer
action);public CompletableFuture
thenRun(Runnable action);

这些方法都是同步执行的

@Testpublic void testSync11() {    CompletableFuture
cf = CompletableFuture.completedFuture("message").thenApply(s -> { randomSleep(); System.out.println("thenApply=" + Thread.currentThread().getName() + "|" + Thread.currentThread().isDaemon()); return s.toUpperCase(); }); // gotNow 如果成功就返回结果 System.out.println(cf.getNow(null)); // 一直等待成功,然后返回结果 System.out.println(cf.join());}

输出如下,

thenApply=main|falseMESSAGEMESSAGE

首先通过 completedFuture 方法获取一个结果已经完成的Future,然后执行同步方法thenApply,由main线程执行,会阻塞当前的main线程 ,最后getNow方法获取到结果。

 

CompletableFuture 异步执行示例

CompletableFuture中异步执行的方法都是带Async 结尾的,可以制定执行异步任务的线程池,也可以不指定,如果不指定,默认使用ForkJoinPool.commonPool() 线程池。

public  CompletionStage thenApplyAsync(Function
fn);public CompletionStage thenApplyAsync(Function
fn,Executor executor);public CompletableFuture
thenAcceptAsync(Consumer
action);public CompletableFuture
thenAcceptAsync(Consumer
action, Executor executor);public CompletableFuture
thenRunAsync(Runnable action);public CompletableFuture
thenRunAsync(Runnable action,Executor executor);

以下使用的两个方法都是异步执行任务的方法

@Testpublic void testAsync1() {    CompletableFuture
cf = CompletableFuture.supplyAsync(() -> { randomSleep(); System.out.println("supplyAsync=" + Thread.currentThread().getName() + "|" + Thread.currentThread().isDaemon()); return "message"; }).thenApplyAsync(s -> { randomSleep(); System.out.println("thenApplyAsync=" + Thread.currentThread().getName() + "|" + Thread.currentThread().isDaemon()); return s.toUpperCase(); }); // gotNow 如果成功就返回结果 System.out.println(cf.getNow(null)); // 一直等待成功,然后返回结果 System.out.println(cf.join());}

输出如下,

nullsupplyAsync=ForkJoinPool.commonPool-worker-1|truethenApplyAsync=ForkJoinPool.commonPool-worker-1|trueMESSAGE

当执行 cf.gotNow 方法的时候,异步任务还没有执行完成,所以返回 null 。执行 cf.join 方法,阻塞一直等到异步任务结果返回。

 

thenApply 是由哪个线程执行的

thenApply 不带async结尾,是一个同步方法,但可能还是由执行任务的线程池来执行,或者是当前main线程来执行。

@Testpublic void testAsync125() {    CompletableFuture
cf = CompletableFuture.supplyAsync(() -> { //没有sleep System.out.println("supplyAsync=" + Thread.currentThread().getName() + "|" + Thread.currentThread().isDaemon()); return "message"; }).thenApply(s -> { // thenApplyAsync=main|false 使用调用者线程来进行处理 System.out.println("thenApply=" + Thread.currentThread().getName() + "|" + Thread.currentThread().isDaemon()); return s.toUpperCase(); }); // gotNow 如果成功就返回结果 System.out.println(cf.getNow(null)); // 一直等待成功,然后返回结果 System.out.println(cf.join());}@Testpublic void testAsync126() throws InterruptedException { CompletableFuture
cf = CompletableFuture.supplyAsync(() -> { randomSleep(); System.out.println("supplyAsync=" + Thread.currentThread().getName() + "|" + Thread.currentThread().isDaemon()); return "message"; }); TimeUnit.SECONDS.sleep(2); // 使用调用者线程 当前线程main 来进行处理thenApply 转换操作 cf = cf.thenApply(s -> { System.out.println("thenApply=" + Thread.currentThread().getName() + "|" + Thread.currentThread().isDaemon()); return s.toUpperCase(); }); // gotNow 如果成功就返回结果 System.out.println(cf.getNow(null)); // 一直等待成功,然后返回结果 System.out.println(cf.join());}@Testpublic void testAsync124() { CompletableFuture
cf = CompletableFuture.supplyAsync(() -> { randomSleep(); System.out.println("supplyAsync=" + Thread.currentThread().getName() + "|" + Thread.currentThread().isDaemon()); return "message"; }).thenApply(s -> { System.out.println("thenApply=" + Thread.currentThread().getName() + "|" + Thread.currentThread().isDaemon()); return s.toUpperCase(); }); // gotNow 如果成功就返回结果 System.out.println(cf.getNow(null)); // 一直等待成功,然后返回结果 System.out.println(cf.join());}

输出如下,

supplyAsync=ForkJoinPool.commonPool-worker-1|truethenApplyAsync=main|falseMESSAGEMESSAGE//supplyAsync=ForkJoinPool.commonPool-worker-1|truethenApply=main|falseMESSAGEMESSAGE//nullsupplyAsync=ForkJoinPool.commonPool-worker-1|truethenApply=ForkJoinPool.commonPool-worker-1|trueMESSAGE

在testAsync125方法中,thenApply 回调方法是由当前main线程执行的;

在testAsync126方法中,thenApply 回调方法是由当前main线程执行的;

在testAsync124方法中,thenApply 方法是由执行任务的线程池的线程来执行的,thenApply 虽然是一个同步方法,但其调用是通过 ForkJoinPool.commonPool 线程池异步执行的。

所以要注意的是 如果在thenApply 方法中执行比较耗时的操作,会阻塞调用者线程或者主线程。

 

CompletableFuture allOf 方法同步执行效果

When we need to execute multiple Futures in parallel, we usually want to wait for all of them to execute and then process their combined results.

The CompletableFuture.allOf static method allows to wait for completion of all of the Futures provided as a var-arg:

CompletableFuture
future1 = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } return "Hello";});CompletableFuture
future2 = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } return "Beautiful";});CompletableFuture
future3 = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(4); } catch (InterruptedException e) { e.printStackTrace(); } return "World";});System.out.println("f1=" + future1.isDone());System.out.println("f2=" + future2.isDone());System.out.println("f3=" + future3.isDone());CompletableFuture
combinedFuture = CompletableFuture.allOf(future1, future2, future3);System.out.println("========");System.out.println("f1=" + future1.isDone());System.out.println("f2=" + future2.isDone());System.out.println("f3=" + future3.isDone());// 等待所有的future 执行完成combinedFuture.join();System.out.println("========");System.out.println("f1=" + future1.isDone());System.out.println("f2=" + future2.isDone());System.out.println("f3=" + future3.isDone());
f1=falsef2=falsef3=false========f1=falsef2=falsef3=false========f1=truef2=truef3=true

通过 combinedFuture.join()  方法等待所有的异步任务执行完成。当其所有的CompletableFuture均完成结果时,combinedFuture就会处于完成状态

Notice that the return type of the CompletableFuture.allOf() is a CompletableFuture<Void>. The limitation of this method is that it does not return the combined results of all Futures. Instead you have to manually get results from Futures. Fortunately, CompletableFuture.join() method and Java 8 Streams API makes it simple:

String combined = Stream.of(future1, future2, future3)    .map(CompletableFuture::join)    .collect(Collectors.joining(" "));System.out.println(combined);

更简化后完整连贯的代码,

@Testpublic void testAllOf2() {    CompletableFuture
future1 = CompletableFuture.supplyAsync(() -> "Hello"); CompletableFuture
future2 = CompletableFuture.supplyAsync(() -> "Beautiful"); CompletableFuture
future3 = CompletableFuture.supplyAsync(() -> "World"); CompletableFuture.allOf(future1, future2, future3) .thenApply((v) -> Stream.of(future1, future2, future3) .map(CompletableFuture::join) .collect(Collectors.joining(" "))) .thenAccept(System.out::println);}

========END========

转载于:https://my.oschina.net/xinxingegeya/blog/2519436

你可能感兴趣的文章
正则表达式
查看>>
360前端星计划学习-html
查看>>
专注dApp高效执行和高并发的下一代公有链
查看>>
ONE-sys 整合前后端脚手架 koa2 + pm2 + vue-cli3.0 + element
查看>>
携带更方便功能全 iPone与Apple Watch球形尿袋
查看>>
行为型模式:策略模式
查看>>
实现批量数据增强 | keras ImageDataGenerator使用
查看>>
太忙女友消息未及时回复,分手吗?python微信自动消息帮你谈恋爱
查看>>
Java 多线程NIO学习
查看>>
命名实体识别
查看>>
动态切换的动态代理
查看>>
电商项目(下)
查看>>
vue 数字滚动递增效果
查看>>
vue2.0中父子,兄弟组件的传值2
查看>>
Spring Boot注解常用!!!看了就可以开发大量项目了
查看>>
音频编码 Audio Converter
查看>>
SQL - case when then else end 的用法
查看>>
web优化是http缓存(上)
查看>>
19-01-14
查看>>
媒体融合三部曲(未完待续...)
查看>>