CompletableFuture 类声明了 CompletionStage 接口,CompletionStage 接口实际上提供了同步或异步运行计算的舞台。
所谓异步调用其实就是实现一个可无需等待被调用函数的返回值而让操作继续运行的方法。在 Java 语言中,简单的讲就是另启一个线程来完成调用中的部分计算,使调用继续运行或返回,而不需要等待计算结果。但调用者仍需要获取线程的计算结果。
CompletableFuture 简单的异步运算场景
CompletableFuture 提供了如下的异步方法,
public static CompletableFuture supplyAsync(Supplier supplier) { return asyncSupplyStage(asyncPool, supplier); }public static CompletableFuture supplyAsync(Supplier supplier,Executor executor) { return asyncSupplyStage(screenExecutor(executor), supplier);}public static CompletableFuturerunAsync(Runnable runnable) { return asyncRunStage(asyncPool, runnable);}public static CompletableFuture runAsync(Runnable runnable, Executor executor) { return asyncRunStage(screenExecutor(executor), runnable);}
supplyAsync 返回带有任务结果的CompletableFuture,而runAsync返回CompletableFuture<Void>。
Executor
参数可以手动指定线程池,否则默认ForkJoinPool.commonPool()
系统级公共线程池。
注意:ForkJoinPool.commonPool()
是
Daemon Thread(守护线程)只要当前JVM实例中尚存在任何一个非守护线程(用户线程)没有结束,守护线程就全部工作;
只有当用户线程结束时,JVM推出,守护线程随着JVM一同结束工作。
@Testpublic void test() throws InterruptedException { CompletableFuturecf = CompletableFuture.runAsync(() -> { System.out.println("runAsync=" + Thread.currentThread().getName() + "|" + Thread.currentThread().isDaemon()); try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } }); System.out.println("done=" + cf.isDone()); TimeUnit.SECONDS.sleep(4); System.out.println("done=" + cf.isDone());}
输出,
done=falserunAsync=ForkJoinPool.commonPool-worker-1|truedone=true
在这段代码中,runAsync 是异步执行的 ,通过 Thread.currentThread().isDaemon() 打印的结果就可以知道是Daemon线程异步执行的。
CompletableFuture 同步执行示例
CompletableFuture中不带Async的同步方法如下,
public CompletionStage thenApply(Function fn);public CompletableFuturethenAccept(Consumer action);public CompletableFuture thenAcceptBoth(CompletionStage other, BiConsumer action);public CompletableFuture thenRun(Runnable action);
这些方法都是同步执行的
@Testpublic void testSync11() { CompletableFuturecf = CompletableFuture.completedFuture("message").thenApply(s -> { randomSleep(); System.out.println("thenApply=" + Thread.currentThread().getName() + "|" + Thread.currentThread().isDaemon()); return s.toUpperCase(); }); // gotNow 如果成功就返回结果 System.out.println(cf.getNow(null)); // 一直等待成功,然后返回结果 System.out.println(cf.join());}
输出如下,
thenApply=main|falseMESSAGEMESSAGE
首先通过 completedFuture 方法获取一个结果已经完成的Future,然后执行同步方法thenApply,由main线程执行,会阻塞当前的main线程 ,最后getNow方法获取到结果。
CompletableFuture 异步执行示例
CompletableFuture中异步执行的方法都是带Async 结尾的,可以制定执行异步任务的线程池,也可以不指定,如果不指定,默认使用ForkJoinPool.commonPool() 线程池。
public CompletionStage thenApplyAsync(Function fn);public CompletionStage thenApplyAsync(Function fn,Executor executor);public CompletableFuturethenAcceptAsync(Consumer action);public CompletableFuture thenAcceptAsync(Consumer action, Executor executor);public CompletableFuture thenRunAsync(Runnable action);public CompletableFuture thenRunAsync(Runnable action,Executor executor);
以下使用的两个方法都是异步执行任务的方法
@Testpublic void testAsync1() { CompletableFuturecf = CompletableFuture.supplyAsync(() -> { randomSleep(); System.out.println("supplyAsync=" + Thread.currentThread().getName() + "|" + Thread.currentThread().isDaemon()); return "message"; }).thenApplyAsync(s -> { randomSleep(); System.out.println("thenApplyAsync=" + Thread.currentThread().getName() + "|" + Thread.currentThread().isDaemon()); return s.toUpperCase(); }); // gotNow 如果成功就返回结果 System.out.println(cf.getNow(null)); // 一直等待成功,然后返回结果 System.out.println(cf.join());}
输出如下,
nullsupplyAsync=ForkJoinPool.commonPool-worker-1|truethenApplyAsync=ForkJoinPool.commonPool-worker-1|trueMESSAGE
当执行 cf.gotNow 方法的时候,异步任务还没有执行完成,所以返回 null 。执行 cf.join 方法,阻塞一直等到异步任务结果返回。
thenApply 是由哪个线程执行的
thenApply 不带async结尾,是一个同步方法,但可能还是由执行任务的线程池来执行,或者是当前main线程来执行。
@Testpublic void testAsync125() { CompletableFuturecf = CompletableFuture.supplyAsync(() -> { //没有sleep System.out.println("supplyAsync=" + Thread.currentThread().getName() + "|" + Thread.currentThread().isDaemon()); return "message"; }).thenApply(s -> { // thenApplyAsync=main|false 使用调用者线程来进行处理 System.out.println("thenApply=" + Thread.currentThread().getName() + "|" + Thread.currentThread().isDaemon()); return s.toUpperCase(); }); // gotNow 如果成功就返回结果 System.out.println(cf.getNow(null)); // 一直等待成功,然后返回结果 System.out.println(cf.join());}@Testpublic void testAsync126() throws InterruptedException { CompletableFuture cf = CompletableFuture.supplyAsync(() -> { randomSleep(); System.out.println("supplyAsync=" + Thread.currentThread().getName() + "|" + Thread.currentThread().isDaemon()); return "message"; }); TimeUnit.SECONDS.sleep(2); // 使用调用者线程 当前线程main 来进行处理thenApply 转换操作 cf = cf.thenApply(s -> { System.out.println("thenApply=" + Thread.currentThread().getName() + "|" + Thread.currentThread().isDaemon()); return s.toUpperCase(); }); // gotNow 如果成功就返回结果 System.out.println(cf.getNow(null)); // 一直等待成功,然后返回结果 System.out.println(cf.join());}@Testpublic void testAsync124() { CompletableFuture cf = CompletableFuture.supplyAsync(() -> { randomSleep(); System.out.println("supplyAsync=" + Thread.currentThread().getName() + "|" + Thread.currentThread().isDaemon()); return "message"; }).thenApply(s -> { System.out.println("thenApply=" + Thread.currentThread().getName() + "|" + Thread.currentThread().isDaemon()); return s.toUpperCase(); }); // gotNow 如果成功就返回结果 System.out.println(cf.getNow(null)); // 一直等待成功,然后返回结果 System.out.println(cf.join());}
输出如下,
supplyAsync=ForkJoinPool.commonPool-worker-1|truethenApplyAsync=main|falseMESSAGEMESSAGE//supplyAsync=ForkJoinPool.commonPool-worker-1|truethenApply=main|falseMESSAGEMESSAGE//nullsupplyAsync=ForkJoinPool.commonPool-worker-1|truethenApply=ForkJoinPool.commonPool-worker-1|trueMESSAGE
在testAsync125方法中,thenApply 回调方法是由当前main线程执行的;
在testAsync126方法中,thenApply 回调方法是由当前main线程执行的;
在testAsync124方法中,thenApply 方法是由执行任务的线程池的线程来执行的,thenApply 虽然是一个同步方法,但其调用是通过 ForkJoinPool.commonPool 线程池异步执行的。
所以要注意的是 如果在thenApply 方法中执行比较耗时的操作,会阻塞调用者线程或者主线程。
CompletableFuture allOf 方法同步执行效果
When we need to execute multiple Futures in parallel, we usually want to wait for all of them to execute and then process their combined results.
The CompletableFuture.allOf static method allows to wait for completion of all of the Futures provided as a var-arg:
CompletableFuturefuture1 = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } return "Hello";});CompletableFuture future2 = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } return "Beautiful";});CompletableFuture future3 = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(4); } catch (InterruptedException e) { e.printStackTrace(); } return "World";});System.out.println("f1=" + future1.isDone());System.out.println("f2=" + future2.isDone());System.out.println("f3=" + future3.isDone());CompletableFuture combinedFuture = CompletableFuture.allOf(future1, future2, future3);System.out.println("========");System.out.println("f1=" + future1.isDone());System.out.println("f2=" + future2.isDone());System.out.println("f3=" + future3.isDone());// 等待所有的future 执行完成combinedFuture.join();System.out.println("========");System.out.println("f1=" + future1.isDone());System.out.println("f2=" + future2.isDone());System.out.println("f3=" + future3.isDone());
f1=falsef2=falsef3=false========f1=falsef2=falsef3=false========f1=truef2=truef3=true
通过 combinedFuture.join() 方法等待所有的异步任务执行完成。当其所有的CompletableFuture均完成结果时,combinedFuture就会处于完成状态
Notice that the return type of the CompletableFuture.allOf() is a CompletableFuture<Void>. The limitation of this method is that it does not return the combined results of all Futures. Instead you have to manually get results from Futures. Fortunately, CompletableFuture.join() method and Java 8 Streams API makes it simple:
String combined = Stream.of(future1, future2, future3) .map(CompletableFuture::join) .collect(Collectors.joining(" "));System.out.println(combined);
更简化后完整连贯的代码,
@Testpublic void testAllOf2() { CompletableFuturefuture1 = CompletableFuture.supplyAsync(() -> "Hello"); CompletableFuture future2 = CompletableFuture.supplyAsync(() -> "Beautiful"); CompletableFuture future3 = CompletableFuture.supplyAsync(() -> "World"); CompletableFuture.allOf(future1, future2, future3) .thenApply((v) -> Stream.of(future1, future2, future3) .map(CompletableFuture::join) .collect(Collectors.joining(" "))) .thenAccept(System.out::println);}
========END========