工作项列表及其进展,参见 issue 6。
👉 cffu
(CompletableFuture Fu
🦝)是一个小小的CompletableFuture(CF)
辅助增强库,提升CF
使用体验并减少误用,期望在业务中更方便高效安全地使用CF
。
欢迎 👏 💖
- 建议和提问,提交 Issue
- 贡献和改进,Fork 后提通过 Pull Request 贡献代码
提供的功能有:
- ☘️ 补全业务使用中缺失的功能
- 更方便的功能,如
cffuAllOf
/allOfWithResult
方法:返回多个CF
的结果,而不是无返回结果Void
(allOf
)cffuCombine
/combine
方法:返回多个CF
不同类型的结果,而不是同一类型(cffuAllOf
/allOfWithResult
)
- 更高效灵活的并发执行策略,如
cffuAllOfFastFail
/allOfFastFail
方法:有CF
失败时快速返回,而不再等待所有CF
运行完成(allOf
)cffuAnyOfSuccess
/anyOfSuccess
方法:返回首个成功的CF
结果,而不是首个完成(但可能失败)的CF
(anyOf
)
- 更安全的使用方式,如
- 支持设置缺省的业务线程池(
CffuFactoryBuilder#newCffuFactoryBuilder(executor)
方法) cffuJoin(timeout, unit)
方法:支持超时的join
的方法- 支持禁止强制篡改(
CffuFactoryBuilder#forbidObtrudeMethods
方法) - 在类方法附加完善的代码质量注解(如
@NonNull
、@Nullable
、@CheckReturnValue
、@Contract
等),在编码时IDE
能尽早提示出问题
- 支持设置缺省的业务线程池(
- 更方便的功能,如
- 💪 已有功能的增强,如
cffuAnyOf
/anyOfWithType
方法:返回类型是T
(类型安全),而不是返回Object
(anyOf
)
- ⏳
Backport
支持Java 8
,Java 9+
高版本的所有CF
新功能在Java 8
等低Java
版本直接可用,如- 超时控制:
orTimeout
/completeOnTimeout
方法 - 延迟执行:
delayedExecutor
方法 - 工厂方法:
failedFuture
/completedStage
/failedStage
- 处理操作:
completeAsync
/exceptionallyAsync
/exceptionallyCompose
/copy
- 超时控制:
- 🍩 一等公民支持
Kotlin
更多cffu
的使用方式与功能说明详见 User Guide。
如何管理并发执行是个复杂易错的问题,业界有大量的工具、框架可以采用。
并发工具、框架的广度了解,可以看看如《七周七并发模型》、《Java虚拟机并发编程》、《Scala并发编程(第2版)》;更多关于并发主题的书籍参见书单。
其中CompletableFuture(CF)
有其优点:
Java
标准库内置- 无需额外依赖,几乎总是可用
- 相信有极高的实现质量
- 广为人知广泛使用,有一流的群众基础
CompletableFuture
在2014年发布的Java 8
提供,有~10年了CompletableFuture
的父接口Future
早在2004年发布的Java 5
中提供,有~20年了- 虽然
Future
接口不支持 执行结果的异步获取与并发执行逻辑的编排,但也让广大Java
开发者熟悉了Future
这个典型的概念与工具
- 功能强大、但不会非常庞大复杂
- 高层抽象
- 或说 以业务流程的形式表达技术的并发流程
- 可以不使用繁琐易错的基础并发协调工具,如
CountDownLatch
、锁(Lock
)、信号量(Semaphore
)
和其它并发工具、框架一样,CompletableFuture
用于
- 并发执行业务逻辑,或说编排并发的处理流程/处理任务
- 利用多核并行处理
- 提升业务响应性
值得更深入了解和应用。 💕
cffu
支持三种使用方式:
- 🦝 1) 使用
Cffu
类- 项目使用
Java
语言时,推荐这种使用方式 - 直接使用
CompletableFuture
类的代码可以比较简单的迁移到Cffu
类,包含2步修改:- 在类型声明地方,由
CompletableFuture
改成Cffu
- 在
CompletableFuture
静态方法调用的地方,类名CompletableFuture
改成cffuFactory
实例 - 更多参见如何从直接使用
CompletableFuture
类迁移到Cffu
类
- 在类型声明地方,由
- 依赖
io.foldright:cffu
库
- 项目使用
- 🛠️️ 2) 使用
CompletableFutureUtils
工具类- 如果你不想在项目中引入新类(
Cffu
类)、觉得这样增加了复杂性的话,- 完全可以把
cffu
库作为一个工具类来用 - 优化
CompletableFuture
使用的工具方法在业务项目中很常见,CompletableFutureUtils
提供了一系列实用可靠的工具方法
- 完全可以把
- 这种使用方式有些
cffu
功能没有提供(也没有想到实现方案) 😔
如支持设置缺省的业务线程池、禁止强制篡改 - 依赖
io.foldright:cffu
库
- 如果你不想在项目中引入新类(
- 🍩 3) 使用
Kotlin
扩展方法- 项目使用
Kotlin
语言时,推荐这种使用方式 - 要依赖
io.foldright:cffu-kotlin
库
- 项目使用
在介绍功能点之前,可以先看看cffu
不同使用方式的示例。 🎪
public class CffuDemo {
private static final ExecutorService myBizThreadPool = Executors.newCachedThreadPool();
// Create a CffuFactory with configuration of the customized thread pool
private static final CffuFactory cffuFactory = newCffuFactoryBuilder(myBizThreadPool).build();
public static void main(String[] args) throws Exception {
final Cffu<Integer> cf42 = cffuFactory
.supplyAsync(() -> 21) // Run in myBizThreadPool
.thenApply(n -> n * 2);
// Below tasks all run in myBizThreadPool
final Cffu<Integer> longTaskA = cf42.thenApplyAsync(n -> {
sleep(1001);
return n / 2;
});
final Cffu<Integer> longTaskB = cf42.thenApplyAsync(n -> {
sleep(1002);
return n / 2;
});
final Cffu<Integer> longTaskC = cf42.thenApplyAsync(n -> {
sleep(100);
return n * 2;
});
final Cffu<Integer> longFailedTask = cf42.thenApplyAsync(unused -> {
sleep(1000);
throw new RuntimeException("Bang!");
});
final Cffu<Integer> combined = longTaskA.thenCombine(longTaskB, Integer::sum)
.orTimeout(1500, TimeUnit.MILLISECONDS);
System.out.println("combined result: " + combined.get());
final Cffu<Integer> anyOfSuccess = cffuFactory.cffuAnyOfSuccess(longTaskC, longFailedTask);
System.out.println("anyOfSuccess result: " + anyOfSuccess.get());
}
}
# 完整可运行的Demo代码参见
CffuDemo.java
。
public class CompletableFutureUtilsDemo {
private static final ExecutorService myBizThreadPool = Executors.newCachedThreadPool();
public static void main(String[] args) throws Exception {
final CompletableFuture<Integer> cf42 = CompletableFuture
.supplyAsync(() -> 21, myBizThreadPool) // Run in myBizThreadPool
.thenApply(n -> n * 2);
final CompletableFuture<Integer> longTaskA = cf42.thenApplyAsync(n -> {
sleep(1001);
return n / 2;
}, myBizThreadPool);
final CompletableFuture<Integer> longTaskB = cf42.thenApplyAsync(n -> {
sleep(1002);
return n / 2;
}, myBizThreadPool);
final CompletableFuture<Integer> longTaskC = cf42.thenApplyAsync(n -> {
sleep(100);
return n * 2;
}, myBizThreadPool);
final CompletableFuture<Integer> longFailedTask = cf42.thenApplyAsync(unused -> {
sleep(1000);
throw new RuntimeException("Bang!");
}, myBizThreadPool);
final CompletableFuture<Integer> combined = longTaskA.thenCombine(longTaskB, Integer::sum);
final CompletableFuture<Integer> combinedWithTimeout = CompletableFutureUtils.orTimeout(combined, 1500, TimeUnit.MILLISECONDS);
System.out.println("combined result: " + combinedWithTimeout.get());
final CompletableFuture<Integer> anyOfSuccess = CompletableFutureUtils.anyOfSuccessWithType(longTaskC, longFailedTask);
System.out.println("anyOfSuccess result: " + anyOfSuccess.get());
}
}
# 完整可运行的Demo代码参见
CompletableFutureUtilsDemo.java
。
private val myBizThreadPool: ExecutorService = Executors.newCachedThreadPool()
// Create a CffuFactory with configuration of the customized thread pool
private val cffuFactory: CffuFactory = newCffuFactoryBuilder(myBizThreadPool).build()
fun main() {
val cf42 = cffuFactory
.supplyAsync { 21 } // Run in myBizThreadPool
.thenApply { it * 2 }
// Below tasks all run in myBizThreadPool
val longTaskA = cf42.thenApplyAsync { n: Int ->
sleep(1001)
n / 2
}
val longTaskB = cf42.thenApplyAsync { n: Int ->
sleep(1002)
n / 2
}
val longTaskC = cf42.thenApplyAsync { n: Int ->
sleep(100)
n * 2
}
val longFailedTask = cf42.thenApplyAsync<Int> { _ ->
sleep(1000)
throw RuntimeException("Bang!")
}
val combined = longTaskA.thenCombine(longTaskB, Integer::sum)
.orTimeout(1500, TimeUnit.MILLISECONDS)
println("combined result: ${combined.get()}")
val anyOfSuccess: Cffu<Int> = listOf(longTaskC, longFailedTask).anyOfSuccessCffu()
println("anyOfSuccess result: ${anyOfSuccess.get()}")
}
# 完整可运行的Demo代码参见
CffuDemo.kt
。
CompletableFuture
的allOf
方法没有返回结果,只是返回Void
,不方便获得所运行的多个CF
结果。
# 要再通过入参CF
的get
方法来获取结果。
cffu
的cffuAllOf
/allOfWithResult
方法提供了返回多个CF
结果的功能。
示例代码如下:
public class AllOfWithResultDemo {
public static final Executor myBizExecutor = Executors.newCachedThreadPool();
public static final CffuFactory cffuFactory = newCffuFactoryBuilder(myBizExecutor).build();
public static void main(String[] args) {
//////////////////////////////////////////////////
// CffuFactory#cffuAllOf
//////////////////////////////////////////////////
Cffu<Integer> cffu1 = cffuFactory.completedFuture(21);
Cffu<Integer> cffu2 = cffuFactory.completedFuture(42);
Cffu<Void> allOf2 = cffuFactory.allOf(cffu1, cffu2);
// Result type is Void!!
// the result can be got by input argument `cf1.get()`, but it's cumbersome.
// so we can see a lot the util methods to enhance allOf with result in our project.
Cffu<List<Integer>> allOfWithResult = cffuFactory.cffuAllOf(cffu1, cffu2);
System.out.println(allOfWithResult.get());
//////////////////////////////////////////////////
// or CompletableFutureUtils#allOfWithResult
//////////////////////////////////////////////////
CompletableFuture<Integer> cf1 = CompletableFuture.completedFuture(21);
CompletableFuture<Integer> cf2 = CompletableFuture.completedFuture(42);
CompletableFuture<Void> allOf = CompletableFuture.allOf(cf1, cf2);
// Result type is Void!!
CompletableFuture<List<Integer>> allOfWithResult2 = CompletableFutureUtils.allOfWithResult(cf1, cf2);
System.out.println(allOfWithResult2.get());
}
}
# 完整可运行的Demo代码参见
AllOfWithResultDemo.java
。
上面多个相同结果类型的CF
,cffu
还提供了返回多个不同类型CF
结果的方法,cffuCombine
/CompletableFutureUtils#combine
方法。
示例代码如下:
public class CffuCombineDemo {
public static final Executor myBizExecutor = Executors.newCachedThreadPool();
public static final CffuFactory cffuFactory = newCffuFactoryBuilder(myBizExecutor).build();
public static void main(String[] args) throws Exception {
//////////////////////////////////////////////////
// cffuCombine
//////////////////////////////////////////////////
Cffu<String> cffu1 = cffuFactory.completedFuture("21");
Cffu<Integer> cffu2 = cffuFactory.completedFuture(42);
Cffu<Tuple2<String, Integer>> allOfWithResult = cffu1.cffuCombine(cffu2);
// or: cffuFactory.cffuCombine(cffu1, cffu2);
System.out.println(allOfWithResult.get());
//////////////////////////////////////////////////
// or CompletableFutureUtils.combine
//////////////////////////////////////////////////
CompletableFuture<String> cf1 = CompletableFuture.completedFuture("21");
CompletableFuture<Integer> cf2 = CompletableFuture.completedFuture(42);
CompletableFuture<Tuple2<String, Integer>> allOfWithResult2 = CompletableFutureUtils.combine(cf1, cf2);
System.out.println(allOfWithResult2.get());
}
}
# 完整可运行的Demo代码参见
CffuCombineDemo.java
。
CompletableFuture
执行执行(即CompletableFuture
的*Async
方法),使用的缺省线程池是ForkJoinPool.commonPool()
。- 这个线程池差不多是
CPU
个线程,合适执行CPU
密集的任务;对于业务逻辑,往往有很多等待操作(如网络IO
、阻塞等待),并不是CPU
密集的。 - 业务使用这个缺省线程池
ForkJoinPool.commonPool()
是很危险的❗
结果就是,业务调用CompletableFuture
的*Async
方法时,几乎每次都要反复传入业务线程池;这让CompletableFuture
的使用很繁琐易错 🤯
示例代码如下:
public class NoDefaultExecutorSettingForCompletableFuture {
public static final Executor myBizExecutor = Executors.newCachedThreadPool();
public static void main(String[] args) {
CompletableFuture<Void> cf1 = CompletableFuture.runAsync(() -> System.out.println("doing a long time work!"),
myBizExecutor);
CompletableFuture<Void> cf2 = CompletableFuture
.supplyAsync(
() -> {
System.out.println("doing another long time work!!");
return 42;
},
myBizExecutor)
.thenAcceptAsync(
i -> System.out.println("doing third long time work!!!"),
myBizExecutor);
CompletableFuture.allOf(cf1, cf2).join();
}
}
# 完整可运行的Demo代码参见
NoDefaultExecutorSettingForCompletableFuture.java
。
Cffu
支持设置缺省的业务线程池,规避上面的繁琐与危险。示例代码如下:
public class DefaultExecutorSettingForCffu {
public static final Executor myBizExecutor = Executors.newCachedThreadPool();
public static final CffuFactory cffuFactory = newCffuFactoryBuilder(myBizExecutor).build();
public static void main(String[] args) {
Cffu<Void> cf1 = cffuFactory.runAsync(() -> System.out.println("doing a long time work!"));
Cffu<Void> cf2 = cffuFactory.supplyAsync(() -> {
System.out.println("doing another long time work!!");
return 42;
}).thenAcceptAsync(i -> System.out.println("doing third long time work!!!"));
cffuFactory.allOf(cf1, cf2).join();
}
}
# 完整可运行的Demo代码参见
DefaultExecutorSettingForCffu.java
。
CompletableFuture
的allOf
方法会等待所有输入CF
运行完成;即使有CF
失败了也要等待后续CF
运行完成,再返回一个失败的CF
。- 对于业务逻辑来说,这样失败且继续等待策略,减慢了业务响应性;会希望如果有输入
CF
失败了,则快速失败不再做于事无补的等待 cffu
提供了相应的cffuAllOfFastFail
/allOfFastFail
方法allOf
/allOfFastFail
两者都是,只有当所有的输入CF
都成功时,才返回成功结果
- 对于业务逻辑来说,这样失败且继续等待策略,减慢了业务响应性;会希望如果有输入
CompletableFuture
的anyOf
方法返回首个完成的CF
(不会等待后续没有完成的CF
,赛马模式);即使首个完成的CF
是失败的,也会返回这个失败的CF
结果。- 对于业务逻辑来说,会希望赛马模式返回首个成功的
CF
结果,而不是首个完成但失败的CF
cffu
提供了相应的cffuAnyOfSuccess
/anyOfSuccess
方法anyOfSuccess
只有当所有的输入CF
都失败时,才返回失败结果
- 对于业务逻辑来说,会希望赛马模式返回首个成功的
📔 关于多个
CF
的并发执行策略,可以看看JavaScript
规范Promise Concurrency
;在JavaScript
中,Promise
即对应CompletableFuture
。
JavaScript Promise
提供了4个并发执行方法:
Promise.all()
:等待所有Promise
运行成功,只要有一个失败就立即返回失败(对应cffu
的allOfFastFail
方法)Promise.allSettled()
:等待所有Promise
运行完成,不管成功失败(对应cffu
的allOf
方法)Promise.any()
:赛马模式,立即返回首个成功的Promise
(对应cffu
的anyOfSuccess
方法)Promise.race()
:赛马模式,立即返回首个完成的Promise
(对应cffu
的anyOf
方法)PS:
JavaScript Promise
的方法命名真考究~ 👍
cffu
新加2个方法后,对齐了JavaScript Promise
规范的并发方法~ 👏
示例代码如下:
public class ConcurrencyStrategyDemo {
public static final Executor myBizExecutor = Executors.newCachedThreadPool();
public static final CffuFactory cffuFactory = newCffuFactoryBuilder(myBizExecutor).build();
public static void main(String[] args) throws Exception {
////////////////////////////////////////////////////////////////////////
// CffuFactory#cffuAllOfFastFail / allOfFastFail
// CffuFactory#cffuAnyOfSuccess / anyOfSuccess
////////////////////////////////////////////////////////////////////////
final Cffu<Integer> successAfterLongTime = cffuFactory.supplyAsync(() -> {
sleep(3000); // sleep LONG time
return 42;
});
final Cffu<Integer> failed = cffuFactory.failedFuture(new RuntimeException("Bang!"));
// Result type is Void!
Cffu<Void> cffuAll = cffuFactory.allOfFastFail(successAfterLongTime, failed);
Cffu<List<Integer>> fastFailed = cffuFactory.cffuAllOfFastFail(successAfterLongTime, failed);
// fast failed without waiting successAfterLongTime
System.out.println(fastFailed.exceptionNow());
// Result type is Object!
Cffu<Object> cffuAny = cffuFactory.anyOfSuccess(successAfterLongTime, failed);
System.out.println(cffuAny.get());
Cffu<Integer> anyOfSuccess = cffuFactory.cffuAnyOfSuccess(successAfterLongTime, failed);
System.out.println(anyOfSuccess.get());
////////////////////////////////////////////////////////////////////////
// or CompletableFutureUtils#allOfFastFailWithResult / allOfFastFail
// CompletableFutureUtils#anyOfSuccessWithType / anyOfSuccess
////////////////////////////////////////////////////////////////////////
final CompletableFuture<Integer> successAfterLongTimeCf = CompletableFuture.supplyAsync(() -> {
sleep(3000); // sleep LONG time
return 42;
});
final CompletableFuture<Integer> failedCf = CompletableFutureUtils.failedFuture(new RuntimeException("Bang!"));
// Result type is Void!
CompletableFuture<Void> cfAll = CompletableFutureUtils.allOfFastFail(successAfterLongTimeCf, failedCf);
CompletableFuture<List<Integer>> fastFailedCf = CompletableFutureUtils.allOfFastFailWithResult(successAfterLongTimeCf, failedCf);
// fast failed without waiting successAfterLongTime
System.out.println(CompletableFutureUtils.exceptionNow(fastFailedCf));
// Result type is Object!
CompletableFuture<Object> cfAny = CompletableFutureUtils.anyOfSuccess(successAfterLongTimeCf, failedCf);
System.out.println(cfAny.get());
CompletableFuture<Integer> cfSuccess = CompletableFutureUtils.anyOfSuccessWithType(successAfterLongTimeCf, failedCf);
System.out.println(cfSuccess.get());
}
}
# 完整可运行的Demo代码参见
ConcurrencyStrategyDemo.java
。
cf.join()
会「不超时永远等待」,在业务中很危险❗️当意外出现长时间等待时,会导致:
- 主业务逻辑阻塞,没有机会做相应的处理,以及时响应用户
- 会费掉一个线程,线程是很有限的资源(一般几百个),耗尽线程意味着服务瘫痪故障
cffuJoin(timeout, unit)
方法即支持超时的join
的方法;就像cf.get(timeout, unit)
之于 cf.get()
。
这个新方法使用简单类似,不附代码示例。
Java 9+
高版本的所有CF
新功能在Java 8
等低Java
版本直接可用。
其中重要的Backport功能有:
- 超时控制:
orTimeout
/completeOnTimeout
方法 - 延迟执行:
delayedExecutor
方法 - 工厂方法:
failedFuture
/completedStage
/failedStage
- 处理操作:
completeAsync
/exceptionallyAsync
/exceptionallyCompose
/copy
这些backport
的方法是CompletableFuture
的已有功能,不附代码示例。
CompletableFuture.anyOf
方法返回类型是Object
,丢失具体类型,不够类型安全,使用时需要转型也不方便。
cffu
提供了cffuAnyOf
/anyOfWithType
方法,返回类型是T
(类型安全),而不是返回Object
(anyOf
)。
这个新方法使用简单类似,不附代码示例。
可以参见:
API
文档- 实现源码
cffu
:Cffu.java
、CffuFactory.java
CompletableFuture utils
:CompletableFutureUtils.java
Kotlin extensions
:CffuExtensions.kt
、CompletableFutureExtensions.kt
为了使用cffu
增强功能,可以迁移已有直接使用CompletableFuture
的代码到Cffu
。包含2步修改:
- 在类型声明地方,
CompletableFuture
改成Cffu
- 在
CompletableFuture
静态方法调用的地方,类名CompletableFuture
改成cffuFactory
实例
之所以可以这样迁移,是因为:
CompletableFuture
类的所有实例方法都在Cffu
类,且有相同的方法签名与功能CompletableFuture
类的所有静态方法都在CffuFactory
类,且有相同的方法签名与功能
- 当前版本的
Java API
文档: https://foldright.io/cffu/apidocs/ - 当前版本的
Kotlin API
文档: https://foldright.io/cffu/dokka/
代码示例:
可以在 central.sonatype.com 查看最新版本与可用版本列表。
cffu
库(包含Java CompletableFuture
的增强CompletableFutureUtils
):-
For
Maven
projects:<dependency> <groupId>io.foldright</groupId> <artifactId>cffu</artifactId> <version>0.9.8</version> </dependency>
-
For
Gradle
projects:// Gradle Kotlin DSL implementation("io.foldright:cffu:0.9.8")
// Gradle Groovy DSL implementation 'io.foldright:cffu:0.9.8'
-
cffu Kotlin
支持库:-
For
Maven
projects:<dependency> <groupId>io.foldright</groupId> <artifactId>cffu-kotlin</artifactId> <version>0.9.8</version> </dependency>
-
For
Gradle
projects:// Gradle Kotlin DSL implementation("io.foldright:cffu-kotlin:0.9.8")
// Gradle Groovy DSL implementation 'io.foldright:cffu-kotlin:0.9.8'
-
cffu bom
:-
For
Maven
projects:<dependency> <groupId>io.foldright</groupId> <artifactId>cffu-bom</artifactId> <version>0.9.8</version> <type>pom</type> <scope>import</scope> </dependency>
-
For
Gradle
projects:// Gradle Kotlin DSL implementation(platform("io.foldright:cffu-bom:0.9.8"))
// Gradle Groovy DSL implementation platform('io.foldright:cffu-bom:0.9.8')
-
- 📌
TransmittableThreadLocal(TTL)
的cffu executor wrapper SPI
实现:-
For
Maven
projects:<dependency> <groupId>io.foldright</groupId> <artifactId>cffu-ttl-executor-wrapper</artifactId> <version>0.9.8</version> <scope>runtime</scope> </dependency>
-
For
Gradle
projects:// Gradle Kotlin DSL runtimeOnly("io.foldright:cffu-ttl-executor-wrapper:0.9.8")
// Gradle Groovy DSL runtimeOnly 'io.foldright:cffu-ttl-executor-wrapper:0.9.8'
-
- 官方资料
CompletableFuture
Guide- 完备说明
CompletableFuture
的使用方式 - 给出 最佳实践建议 与 使用陷阱注意
- 期望在业务中,更有效安全地使用
CompletableFuture
- 完备说明
cffu
是 CompletableFuture-Fu
的缩写;读作C Fu
,谐音Shifu/师傅
。
嗯嗯,想到了《功夫熊猫》里可爱的小浣熊师傅吧~ 🦝