這篇文章帶給大家的內容是關於Java8的CompletableFuture的用法介紹(附範例),有一定的參考價值,有需要的朋友可以參考一下,希望對你有幫助。
作為Java 8 Concurrency API改進而引入,本文是CompletableFuture類別的功能和用例的介紹。同時在Java 9 也有對CompletableFuture有一些改進,之後再進入講解。
Future計算
Future非同步計算很難操作,通常我們希望將任何計算邏輯視為一系列步驟。但是在非同步計算的情況下,表示為回呼的方法往往分散在程式碼中或深深地嵌套在彼此內部。但是當我們需要處理其中一個步驟中可能發生的錯誤時,情況可能會變得更複雜。
Futrue介面是Java 5中作為非同步計算而新增的,但它沒有任何方法去進行計算組合或處理可能出現的錯誤。
在Java 8中,引入了CompletableFuture類別。與Future介面一起,它也實作了CompletionStage介面。此介面定義了可與其他Future組合成非同步運算契約。
CompletableFuture同時是一個組合和一個框架,具有大約50種不同的構成,結合,執行非同步計算步驟和處理錯誤。
如此龐大的API可能會令人難以招架,下文將調一些重要的做重點介紹。
使用CompletableFuture作為Future實作
首先,CompletableFuture類別實作Future接口,因此你可以將其用作Future實現,但需要額外的完成實作邏輯。
例如,你可以使用無構參建構函式建立這類的實例,然後使用complete
方法完成。消費者可以使用get方法來阻塞目前線程,直到get()
結果。
在下面的範例中,我們有一個建立CompletableFuture實例的方法,然後在另一個執行緒中計算並立即傳回Future。
計算完成後,該方法透過將結果提供給完整方法來完成Future:
public Future<String> calculateAsync() throws InterruptedException { CompletableFuture<String> completableFuture = new CompletableFuture<>(); Executors.newCachedThreadPool().submit(() -> { Thread.sleep(500); completableFuture.complete("Hello"); return null; }); return completableFuture; }
為了分離計算,我們使用了Executor API ,這種創建和完成CompletableFuture的方法可以與任何並發套件(包括原始執行緒)一起使用。
請注意,該calculateAsync
方法傳回一個Future
實例。
我們只是呼叫方法,接收Future實例並在我們準備阻塞結果時呼叫它的get#方法。
另請注意,get方法拋出一些已檢查的異常,即ExecutionException(封裝計算期間發生的異常)和InterruptedException (表示執行方法的執行緒被中斷的異常):
Future<String> completableFuture = calculateAsync(); // ... String result = completableFuture.get(); assertEquals("Hello", result);
如果你已經知道計算的結果,也可以用變成同步的方式來傳回結果。
Future<String> completableFuture = CompletableFuture.completedFuture("Hello"); // ... String result = completableFuture.get(); assertEquals("Hello", result);
作為在某些場景中,你可能想要取消Future任務的執行。
假設我們沒有找到結果並決定完全取消非同步執行任務。這可以透過Future的取消方法完成。此方法mayInterruptIfRunning
,但在CompletableFuture的情況下,它沒有任何效果,因為中斷不用於控制CompletableFuture的處理。
這是非同步方法的修改版本:
public Future<String> calculateAsyncWithCancellation() throws InterruptedException { CompletableFuture<String> completableFuture = new CompletableFuture<>(); Executors.newCachedThreadPool().submit(() -> { Thread.sleep(500); completableFuture.cancel(false); return null; }); return completableFuture; }
當我們使用Future.get()方法阻塞結果時,cancel()
表示取消執行,它將拋出CancellationException:
Future<String> future = calculateAsyncWithCancellation(); future.get(); // CancellationException
API介紹
static方法說明
上面的程式碼很簡單,下面介紹幾個 static 方法,它們使用任務來實例化一個CompletableFuture 實例。
CompletableFuture.runAsync(Runnable runnable); CompletableFuture.runAsync(Runnable runnable, Executor executor); CompletableFuture.supplyAsync(Supplier<U> supplier); CompletableFuture.supplyAsync(Supplier<U> supplier, Executor executor)
runAsync 方法接收的是Runnable 的實例,但是它沒有傳回值
supplyAsync 方法是JDK8函數式接口,無參數,會傳回一個結果
#這兩個方法是executor 的升級,表示讓任務在指定的執行緒池中執行,不指定的話,通常任務是在ForkJoinPool.commonPool() 執行緒池中執行的。
supplyAsync()使用
靜態方法runAsync
和supplyAsync
允許我們對應地從Runnable和Supplier功能類型中建立CompletableFuture實例。
該Runnable的介面是在執行緒使用舊的接口,它不允許傳回值。
Supplier介面是一個不具有參數,並傳回參數化類型的一個值的單一方法的通用功能介面。
這允許將Supplier的實例作為lambda表達式提供,該表達式執行計算並傳回結果:
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello"); // ... assertEquals("Hello", future.get());
thenRun()使用
在兩個任務任務A ,在任務B中,如果既不需要任務A的值也不想在任務B中引用,那麼你可以將Runnable lambda 傳遞給thenRun()
方法。在下面的範例中,在呼叫future.get()方法之後,我們只需在控制台中列印一行:
範本
CompletableFuture.runAsync(() -> {}).thenRun(() -> {}); CompletableFuture.supplyAsync(() -> "resultA").thenRun(() -> {});
thenRun(Runnable runnable)
,任务 A 执行完执行 B,并且 B 不需要 A 的结果。thenRun(Runnable runnable)
,任务 A 执行完执行 B,会返回resultA
,但是 B 不需要 A 的结果。实战
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "Hello"); CompletableFuture<Void> future = completableFuture .thenRun(() -> System.out.println("Computation finished.")); future.get();
thenAccept()使用
在两个任务任务A,任务B中,如果你不需要在Future中有返回值,则可以用 thenAccept
方法接收将计算结果传递给它。最后的future.get()调用返回Void类型的实例。
模板
CompletableFuture.runAsync(() -> {}).thenAccept(resultA -> {}); CompletableFuture.supplyAsync(() -> "resultA").thenAccept(resultA -> {});
第一行中,runAsync
不会有返回值,第二个方法thenAccept
,接收到的resultA值为null,同时任务B也不会有返回结果
第二行中,supplyAsync
有返回值,同时任务B不会有返回结果。
实战
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "Hello"); CompletableFuture<Void> future = completableFuture .thenAccept(s -> System.out.println("Computation returned: " + s)); future.get();
thenApply()使用
在两个任务任务A,任务B中,任务B想要任务A计算的结果,可以用thenApply方法来接受一个函数实例,用它来处理结果,并返回一个Future函数的返回值:
模板
CompletableFuture.runAsync(() -> {}).thenApply(resultA -> "resultB"); CompletableFuture.supplyAsync(() -> "resultA").thenApply(resultA -> resultA + " resultB");
实战
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "Hello"); CompletableFuture<String> future = completableFuture .thenApply(s -> s + " World"); assertEquals("Hello World", future.get());
当然,多个任务的情况下,如果任务 B 后面还有任务 C,往下继续调用 .thenXxx() 即可。
thenCompose()使用
接下来会有一个很有趣的设计模式;
CompletableFuture API 的最佳场景是能够在一系列计算步骤中组合CompletableFuture实例。
这种组合结果本身就是CompletableFuture,允许进一步再续组合。这种方法在函数式语言中无处不在,通常被称为monadic设计模式
。
简单说,Monad就是一种设计模式,表示将一个运算过程,通过函数拆解成互相连接的多个步骤。你只要提供下一步运算所需的函数,整个运算就会自动进行下去。
在下面的示例中,我们使用thenCompose方法按顺序组合两个Futures。
请注意,此方法采用返回CompletableFuture实例的函数。该函数的参数是先前计算步骤的结果。这允许我们在下一个CompletableFuture的lambda中使用这个值:
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "Hello") .thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " World")); assertEquals("Hello World", completableFuture.get());
该thenCompose方法连同thenApply一样实现了结果的合并计算。但是他们的内部形式是不一样的,它们与Java 8中可用的Stream和Optional类的map和flatMap方法是有着类似的设计思路在里面的。
两个方法都接收一个CompletableFuture并将其应用于计算结果,但thenCompose(flatMap)方法接收一个函数,该函数返回相同类型的另一个CompletableFuture对象。此功能结构允许将这些类的实例继续进行组合计算。
thenCombine()
取两个任务的结果
如果要执行两个独立的任务,并对其结果执行某些操作,可以用Future的thenCombine方法:
模板
CompletableFuture<String> cfA = CompletableFuture.supplyAsync(() -> "resultA"); CompletableFuture<String> cfB = CompletableFuture.supplyAsync(() -> "resultB"); cfA.thenAcceptBoth(cfB, (resultA, resultB) -> {}); cfA.thenCombine(cfB, (resultA, resultB) -> "result A + B");
实战
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "Hello") .thenCombine(CompletableFuture.supplyAsync( () -> " World"), (s1, s2) -> s1 + s2)); assertEquals("Hello World", completableFuture.get());
更简单的情况是,当你想要使用两个Future结果时,但不需要将任何结果值进行返回时,可以用thenAcceptBoth
,它表示后续的处理不需要返回值,而 thenCombine 表示需要返回值:
CompletableFuture future = CompletableFuture.supplyAsync(() -> "Hello") .thenAcceptBoth(CompletableFuture.supplyAsync(() -> " World"), (s1, s2) -> System.out.println(s1 + s2));
thenApply()和thenCompose()之间的区别
在前面的部分中,我们展示了关于thenApply()和thenCompose()的示例。这两个API都是使用的CompletableFuture调用,但这两个API的使用是不同的。
thenApply()
此方法用于处理先前调用的结果。但是,要记住的一个关键点是返回类型是转换泛型中的类型,是同一个CompletableFuture。
因此,当我们想要转换CompletableFuture 调用的结果时,效果是这样的 :
CompletableFuture<Integer> finalResult = compute().thenApply(s-> s + 1);
thenCompose()
该thenCompose()方法类似于thenApply()在都返回一个新的计算结果。但是,thenCompose()使用前一个Future作为参数。它会直接使结果变新的Future,而不是我们在thenApply()中到的嵌套Future,而是用来连接两个CompletableFuture,是生成一个新的CompletableFuture:
CompletableFuture<Integer> computeAnother(Integer i){ return CompletableFuture.supplyAsync(() -> 10 + i); } CompletableFuture<Integer> finalResult = compute().thenCompose(this::computeAnother);
因此,如果想要继续嵌套链接CompletableFuture 方法,那么最好使用thenCompose()。
并行运行多个任务
当我们需要并行执行多个任务时,我们通常希望等待所有它们执行,然后处理它们的组合结果。
该CompletableFuture.allOf
静态方法允许等待所有的完成任务:
API
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs){...}
实战
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello"); CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "Beautiful"); CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> "World"); CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(future1, future2, future3); // ... combinedFuture.get(); assertTrue(future1.isDone()); assertTrue(future2.isDone()); assertTrue(future3.isDone());
请注意,CompletableFuture.allOf()的返回类型是CompletableFuture
String combined = Stream.of(future1, future2, future3) .map(CompletableFuture::join) .collect(Collectors.joining(" ")); assertEquals("Hello Beautiful World", combined);
CompletableFuture 提供了 join() 方法,它的功能和 get() 方法是一样的,都是阻塞获取值,它们的区别在于 join() 抛出的是 unchecked Exception。这使得它可以在Stream.map()方法中用作方法引用。
异常处理
说到这里,我们顺便来说下 CompletableFuture 的异常处理。这里我们要介绍两个方法:
public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn); public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
看下代码
CompletableFuture.supplyAsync(() -> "resultA") .thenApply(resultA -> resultA + " resultB") .thenApply(resultB -> resultB + " resultC") .thenApply(resultC -> resultC + " resultD");
上面的代码中,任务 A、B、C、D 依次执行,如果任务 A 抛出异常(当然上面的代码不会抛出异常),那么后面的任务都得不到执行。如果任务 C 抛出异常,那么任务 D 得不到执行。
那么我们怎么处理异常呢?看下面的代码,我们在任务 A 中抛出异常,并对其进行处理:
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { throw new RuntimeException(); }) .exceptionally(ex -> "errorResultA") .thenApply(resultA -> resultA + " resultB") .thenApply(resultB -> resultB + " resultC") .thenApply(resultC -> resultC + " resultD"); System.out.println(future.join());
上面的代码中,任务 A 抛出异常,然后通过 .exceptionally()
方法处理了异常,并返回新的结果,这个新的结果将传递给任务 B。所以最终的输出结果是:
errorResultA resultB resultC resultD
String name = null; // ... CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> { if (name == null) { throw new RuntimeException("Computation error!"); } return "Hello, " + name; })}).handle((s, t) -> s != null ? s : "Hello, Stranger!"); assertEquals("Hello, Stranger!", completableFuture.get());
当然,它们也可以都为 null,因为如果它作用的那个 CompletableFuture 实例没有返回值的时候,s 就是 null。
Async后缀方法
CompletableFuture类中的API的大多数方法都有两个带有Async后缀的附加修饰。这些方法表示用于异步线程。
没有Async后缀的方法使用调用线程运行下一个执行线程阶段。不带Async方法使用ForkJoinPool.commonPool()线程池的fork / join实现运算任务。带有Async方法使用传递式的Executor任务去运行。
下面附带一个案例,可以看到有thenApplyAsync方法。在程序内部,线程被包装到ForkJoinTask实例中。这样可以进一步并行化你的计算并更有效地使用系统资源。
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "Hello"); CompletableFuture<String> future = completableFuture .thenApplyAsync(s -> s + " World"); assertEquals("Hello World", future.get());
JDK 9 CompletableFuture API
在Java 9中, CompletableFuture API通过以下更改得到了进一步增强:
引入了新的实例API:
还有一些静态实用方法:
最后,为了解决超时问题,Java 9又引入了两个新功能:
结论
在本文中,我们描述了CompletableFuture类的方法和典型用例。
【相关推荐:Java视频教程】
以上是Java8的CompletableFuture的用法介紹(附範例)的詳細內容。更多資訊請關注PHP中文網其他相關文章!