工作项列表及其进展,参见 issue 6。
如何管理并发执行是个复杂易错的问题,业界有大量的工具、框架可以采用。
并发工具、框架的广度了解,可以看看如《七周七并发模型》、《Java虚拟机并发编程》、《Scala并发编程(第2版)》;更多关于并发主题的书籍参见书单。
其中CompletableFuture (CF)
有其优点:
Java
标准库内置- 无需额外依赖,几乎总是可用
- 相信有极高的实现质量
- 广为人知广泛使用,有一流的群众基础
CompletableFuture
在2014年发布的Java 8
提供,有~10年了CompletableFuture
的父接口Future
早在2004年发布的Java 5
中提供,有~20年了- 虽然
Future
接口不支持 执行结果的异步获取与并发执行逻辑的编排,但也让广大Java
开发者熟悉了Future
这个典型的概念与工具
- 功能强大、但不会非常庞大复杂
- 高层抽象
- 或说 以业务流程的形式表达技术的并发流程
- 可以不使用繁琐易错的基础并发协调工具,如
CountDownLatch
、锁(Lock
)、信号量(Semaphore
)
和其它并发工具、框架一样,CompletableFuture
用于
- 并发执行业务逻辑,或说编排并发的处理流程/处理任务
- 利用多核并行处理
- 提升业务响应性
值得更深入了解和应用。 💕
- 作为文档库(即
CompletableFuture
Guide):- 完备说明
CompletableFuture
的使用方式 - 给出 最佳实践建议 与 使用陷阱注意
- 期望在业务中,更有效安全地使用
CompletableFuture
- 完备说明
- 作为代码库(即
cffu
库):- 补齐在业务使用中
CompletableFuture
所缺失的功能 - 期望在业务中,更方便自然地使用
CompletableFuture
- 补齐在业务使用中
为了阅读的简洁方便,后文
CompletableFuture
会简写成CF
。
基本概念与术语:
- 任务(
Task
)/ 计算(Computation
)- 任务逻辑(
Task Logic
)/ 业务逻辑(Biz Logic
) - 执行(
Execute
)任务
- 任务逻辑(
- 状态(
State
)- 运行中(
Running
)〚1〛 - 取消(
Cancelled
)〚2〛 - 完成(
Completed
/Done
)- 成功(
Success
/Successful
)/ 正常完成(Completed Normally
)/ 成功完成(Completed Successfully
) - 失败(
Failed
/Fail
)/ 异常完成(Completed Exceptionally
)
- 成功(
- 运行中(
- 状态转变(
Transition
)- 事件(
Event
)、触发(Trigger
)
- 事件(
- 业务流程(
Biz Flow
)、CF
链(Chain
)- 流程图(
Flow Graph
)、有向无环图 /DAG
- 为什么构建的
CF
链一定是DAG
?
- 为什么构建的
- 流程编排(
Flow Choreography
)
- 流程图(
- 前驱(
Predecessor
)/ 后继(Successor
)- 上游任务 / 前驱任务 /
Dependency Task
(我依赖的任务) - 下游任务 / 后继任务 /
Dependent Task
(依赖我的任务)
- 上游任务 / 前驱任务 /
注:上面用
/
隔开的多个词是,在表述CF
同一个概念时,会碰到的多个术语;在不影响理解的情况下,后文会尽量统一用第一个词来表达。
更多说明:
- 〚1〛 任务状态有且有只有 运行中(
Running
)、取消(Cancelled
)、完成(Completed
)这3种状态。- 对于「完成」状态,进一步可以分成 成功(
Success
)、失败(Failed
)2种状态。
- 对于「完成」状态,进一步可以分成 成功(
- 所以也可以说,任务状态有且只有 运行中、取消、成功、失败 这4种状态。
- 右图是任务的状态及其转变图。
- 在概念上
CF
的状态转变只能是单次单向的,这很简单可靠、也容易理解并和使用直觉一致。 -
注:虽然下文提到的
obtrudeValue()
/obtrudeException
方法可以突破CF
概念上的约定,但这2个后门方法在正常设计实现中不应该会用到,尤其在业务使用应该完全忽略;带来的问题也由使用者自己了解清楚并注意。
- 〚2〛 关于「取消」状态:
- 对于
CompletableFuture
,取消的实现方式是设置CancellationException
异常。
- 对于
- 对于「取消」状态,或说设置了「
CancellationException
」失败异常的CompletableFuture cf
,相比其它异常失败 / 设置了其它失败异常 的情况,不一样的地方:- 调用
cf.get()
/cf.get(timeout, unit)
方法- 会抛出
CancellationException
异常 - 其它异常失败时,这2个方法抛出的是包了一层的
ExecutionException
,cause
是实际的失败异常
- 会抛出
- 调用
cf.join()
/cf.getNow(valueIfAbsent)
方法- 会抛出
CancellationException
异常 - 其它异常失败时,这2个方法抛出的是包了一层的
CompletionException
,cause
是实际的失败异常
- 会抛出
- 调用
cf.exceptionNow()
方法- 会抛出
IllegalStateException
,而不是返回cf
所设置的CancellationException
异常 - 其它异常失败时,
exceptionNow()
返回设置的异常
- 会抛出
- 调用
cf.isCancelled()
方法- 返回
true
- 其它异常失败时,
isCancelled()
返回false
- 返回
- 调用
- 其它地方,
CancellationException
异常与其它异常是一样处理的。比如:- 调用
cf.resultNow()
方法
都是抛出IllegalStateException
异常 - 调用
cf.isDone()
、cf.isCompletedExceptionally()
都是返回true
CompletionStage
接口方法对异常的处理,如
cf.exceptionally()
的方法参数Function<Throwable, T>
所处理的都是直接设置的异常对象没有包装过
- 调用
CF
任务执行/流程编排,即执行提交的代码逻辑/计算/任务,涉及下面4个方面:
- 任务的输入输出
- 即
CF
所关联任务的输入参数/返回结果(及其数据类型)
- 即
- 任务的调度,即在哪个线程来执行任务。可以是
- 在触发的线程中就地连续执行任务
- 在指定
Executor
(的线程)中执行任务
- 任务的错误处理(任务运行出错)
- 任务的超时控制
- 超时控制是并发的基础关注方面之一
- 到了
Java 9
提供了内置支持,新增了completeOnTimeout(...)
/orTimeout(...)
方法
本节「并发关注方面」,会举例上一些
CF
方法名,以说明CF
方法的命名模式;
可以先不用关心方法的具体功能,在「CF
的功能介绍」中会分类展开说明CF
方法及其功能。
对应下面4种情况:
- 无输入无返回(00)
- 对应
Runnable
接口(包含单个run
方法)
- 对应
- 无输入有返回(01)
- 对应
Supplier<O>
接口(包含单个supply
方法)
- 对应
- 有输入无返回(10)
- 对应
Consumer<I>
接口(包含单个accept
方法)
- 对应
- 有输入有返回(11)
- 对应
Function<I, O>
接口(包含单个apply
方法)
- 对应
注:
- 对于有输入或返回的接口(即除了
Runnable
接口)- 都是泛型的,所以可以支持不同的具体数据类型
- 都是处理单个输入数据
- 如果要处理两个输入数据,即有两个上游
CF
的返回,会涉及下面的变体接口
- 对于有输入接口,有两个输入参数的变体接口:
Consumer
接口的两参数变体接口:BiConsumer<I1, I2>
Function
接口的两参数变体接口:BiFunction<I1, I2, O>
CF
通过其方法名中包含的用词来体现:
run
:无输入无返回(00)- 即是
Runnable
接口包含的run
方法名 - 相应的
CF
方法名的一些例子:runAsync(Runnable runnable)
thenRun(Runnable action)
runAfterBoth(CompletionStage<?> other, Runnable action)
runAfterEitherAsync(CompletionStage<?> other, Runnable action)
- 即是
supply
:无输入有返回(01)- 即是
Supplier
接口包含的supply
方法名 - 相应的
CF
方法名的一些例子:supplyAsync(Supplier<U> supplier)
supplyAsync(Supplier<U> supplier, Executor executor)
- 即是
accept
:有输入无返回(10)- 即是
Consumer
接口包含的accept
方法名 - 相应的
CF
方法名的一些例子:thenAccept(Consumer<T> action)
thenAcceptAsync(Consumer<T> action)
thenAcceptBoth(CompletionStage<U> other, BiConsumer<T, U> action)
acceptEitherAsync(CompletionStage<T> other, Consumer<T> action)
- 即是
apply
:有输入有返回(11)- 即是
Function
接口包含的apply
方法名。CF
的方法如 - 相应的
CF
方法名的一些例子:thenApply(Function<T, U> fn)
thenApplyAsync(Function<T, U> fn)
applyToEither(CompletionStage<T> other, Function<T, U> fn)
- 即是
任务调度是指,任务在哪个线程执行。有2种方式:
- 在触发的线程中就地连续执行任务
- 在指定
Executor
(的线程)中执行任务
CF
通过方法名后缀Async
来体现调度方式:
- 有方法名后缀
Async
:- 在触发
CF
后,任务在指定Executor
执行- 如果不指定
executor
参数,缺省是ForkJoinPool.commonPool()
- 如果不指定
- 相应的
CF
方法名的一些例子:runAsync(Runnable runnable)
thenAcceptAsync(Consumer<T> action, Executor executor)
runAfterBothAsync(CompletionStage<?> other, Runnable action)
- 在触发
- 无方法名后缀
Async
:- 任务在触发线程就地连续执行
- 相应的
CF
方法名的一些例子:thenAccept(Consumer<T> action)
thenApply(Function<T, U> fn)
applyToEither(CompletionStage<T> other, Function<T, U> fn)
提交给CF
的任务可以运行出错(抛出异常),即状态是失败(Failed
)或取消(Cancelled
)。
对于直接读取结果的方法:
- 读取 成功结果的方法,如
cf.get()
、cf.join()
会抛出异常(包装的异常)来反馈 - 读取 失败结果的方法,如
cf.exceptionNow()
会返回结果异常或是抛出异常来反馈
对于CompletionStage
接口中编排执行的方法,会根据方法的功能 是只处理成功结果或失败结果一者,或是同时处理成功失败结果二者。如
exceptionally(...)
只处理 失败结果whenComplete(...)
/handle(...)
同时处理 成功与失败结果;- 这2个方法的参数
lamdba
(BiConsumer
/BiFunction
)同时输入成功失败结果2个参数:value
与exception
- 这2个方法的参数
- 其它多数的方法只处理 成功结果
- 对于不处理的结果,效果上就好像
没有调用这个CompletionStage
方法一样,即短路bypass
了 👏
超时控制是并发的基础关注方面之一。
到了Java 9
提供了内置支持,新增了completeOnTimeout(...)
/orTimeout(...)
方法。
CF
的超时控制,在实现上其实可以看成是CF
的使用方式,并不是CF
要实现基础能力;即可以通过其它已有的CF
功能,在CF
外围实现。
见子文档页 cf-functions-intro.md
CF
的方法个数比较多,所以介绍内容有些多,内容继续完善中… 💪 💕
见子文档页 cf-design-patterns.md
还没有什么内容,收集思考展开中… 💪 💕
- 支持设置缺省的业务线程池
CompletableFuture
的缺省线程池是ForkJoinPool.commonPool()
,这个线程池差不多CPU
个线程,合适执行CPU
密集的任务。- 对于业务逻辑往往有很多等待操作(如网络
IO
、阻塞等待),并不是CPU
密集的;使用这个缺省线程池ForkJoinPool.commonPool()
很危险❗️
所以每次调用CompletableFuture
的*async
方法时,都传入业务线程池,很繁琐易错 🤯 Cffu
支持设置缺省的业务线程池,规避上面的繁琐与危险
- 一等公民支持
Kotlin
🍩 cffuAllOf
方法- 运行多个
CompletableFuture
并返回结果的allOf
方法
- 运行多个
cffuAnyOf
方法- 返回具体类型的
anyOf
方法
- 返回具体类型的
cffuCombine(...)
方法- 运行多个(2 ~ 5个)不同类型的
CompletableFuture
,返回结果元组
- 运行多个(2 ~ 5个)不同类型的
cffuJoin(timeout, unit)
方法- 支持超时的
join
的方法;就像cf.get(timeout, unit)
之于cf.get()
CompletableFuture
缺少这个功能,cf.join()
会「不超时永远等待」很危险❗️
- 支持超时的
BackportJava 9+
高版本的所有CompletableFuture
新功能,在Java 8
可以直接使用。
其中重要的Backport功能有:
- 超时控制:
orTimeout(...)
/completeOnTimeout(...)
方法 - 延迟执行:
defaultExecutor(...)
方法 - 工厂方法:
failedFuture(...)
/completedStage(...)
/failedStage(...)
- 运行多个
CompletableFuture
并返回结果的allOf
方法:resultAllOf
方法,运行多个相同结果类型的CompletableFuture
CompletableFuture<List<T>> resultAllOf(CompletableFuture<T>... cfs)
CompletableFuture<List<T>> resultAllOf(List<? extends CompletableFuture<T>> cfs)
resultOf
方法,运行多个不同结果类型的CompletableFuture
CompletableFuture<Pair<T1, T2>> resultOf(CompletableFuture<T1> cf1, CompletableFuture<T2> cf2)
CompletableFuture<Triple<T1, T2, T3>> resultOf(CompletableFuture<T1> cf1, CompletableFuture<T2> cf2, CompletableFuture<T3> cf3)
- 具体类型的
anyOf
方法:- 提供的方法:
CompletableFuture<T> anyOf(CompletableFuture<T>... cfs)
CompletableFuture<T> anyOf(List<? extends CompletableFuture<T>> cfs)
CF
返回的类型是Object
,丢失具体类型:CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
- 提供的方法:
实现所在的类:
import io.foldright.cffu.Cffu;
import io.foldright.cffu.CffuFactory;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import static io.foldright.cffu.CffuFactoryBuilder.newCffuFactoryBuilder;
public class Demo {
private static final ExecutorService myBizThreadPool = Executors.newFixedThreadPool(42);
// Create a CffuFactory with configuration of the customized thread pool
private static final CffuFactory cffuFactory = newCffuFactoryBuilder(myBizThreadPool).build();
public static void main(String[] args) throws Exception {
Cffu<Integer> cf42 = cffuFactory
.supplyAsync(() -> 21) // Run in myBizThreadPool
.thenApply(n -> n * 2);
// Run in myBizThreadPool
Cffu<Integer> longTaskA = cf42.thenApplyAsync(n -> {
sleep(1001);
return n / 2;
});
// Run in myBizThreadPool
Cffu<Integer> longTaskB = cf42.thenApplyAsync(n -> {
sleep(1002);
return n / 2;
});
Cffu<Integer> finalCf = longTaskA.thenCombine(longTaskB, Integer::sum)
.orTimeout(2, TimeUnit.SECONDS);
Integer result = finalCf.get();
System.out.println(result);
////////////////////////////////////////
// cleanup
////////////////////////////////////////
myBizThreadPool.shutdown();
}
static void sleep(long ms) {
try {
Thread.sleep(ms);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
# 完整可运行的Demo代码参见Demo.java
。
import io.foldright.cffu.CffuFactory
import io.foldright.cffu.CffuFactoryBuilder.newCffuFactoryBuilder
import io.foldright.cffu.kotlin.allOfCffu
import java.lang.Thread.sleep
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
private val myBizThreadPool: ExecutorService = Executors.newFixedThreadPool(42)
// Create a CffuFactory with configuration of the customized thread pool
private val cffuFactory: CffuFactory = newCffuFactoryBuilder(myBizThreadPool).build()
fun main() {
val cf42 = cffuFactory
.supplyAsync { 21 } // Run in myBizThreadPool
.thenApply { it * 2 }
listOf(
// Run in myBizThreadPool
cf42.thenApplyAsync { n: Int ->
sleep(1001)
n / 2
},
// Run in myBizThreadPool
cf42.thenApplyAsync { n: Int ->
sleep(1002)
n / 2
},
).allOfCffu(cffuFactory).thenApply(List<Int>::sum).orTimeout(2, TimeUnit.SECONDS).get().let(::println)
////////////////////////////////////////
// cleanup
////////////////////////////////////////
myBizThreadPool.shutdown()
}
# 完整可运行的Demo代码参见Demo.kt
。
当前版本的Java API文档地址: https://foldright.io/cffu/apidocs/
For Maven
projects:
<dependency>
<groupId>io.foldright</groupId>
<artifactId>cffu</artifactId>
<version>0.9.6</version>
</dependency>
For Gradle
projects:
// Gradle Kotlin DSL
implementation("io.foldright:cffu:0.9.6")
// Gradle Groovy DSL
implementation 'io.foldright:cffu:0.9.6'
可以在 central.sonatype.com 查看最新版本与可用版本列表。
cffu
是 CompletableFuture-Fu
的缩写;读作C Fu
,谐音Shifu/师傅
。
嗯嗯,想到了《功夫熊猫》里可爱的小浣熊师傅吧~ 🦝