掌握Reactor Core实现响应式编程
Opened this issue · 0 comments
Reactor Core介绍
Reactor Core是一个基于Java 8的库,它实现了响应式编程模型,使得开发者能够构建高效、可伸缩、响应式的应用程序。它是建立在Reactive Streams规范之上的,这个规范定义了一套标准,用于构建异步的、基于事件的数据流处理系统。
Reactive Streams规范的核心是提供一种异步流程处理的方法,它允许不同组件之间以非阻塞的方式交换数据流。这种规范特别适用于处理大量的事件,这些事件需要被系统以异步的方式快速响应和处理。例如,在金融应用中,可能会有成千上万的股票价格更新每秒发生,Reactor Core能够帮助这样的应用快速地处理这些高频率的数据流。
Reactor Core提供了两个基本的响应式类型:Flux和Mono。Flux代表一个包含0到N个元素的异步序列,而Mono代表一个包含0到1个元素的异步序列。这两种类型都支持各种操作符,可以用来创建、转换、过滤和组合数据流。
通过使用Reactor Core,开发者可以利用响应式编程的优势,例如更好的资源利用率、更简洁的并发代码以及更好的错误处理机制。这使得Reactor Core成为构建现代、高性能、响应式Java应用程序的强大工具。
Maven 依赖
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.4.16</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.6</version>
</dependency>
官网:https://projectreactor.io/docs
生成数据流
Flux和Mono都为响应式流publisher接口的实现。主要是为了生成数据流。这两个类都遵照规范,我们可以在下面代码中使用该接口:
Publisher<String> just = Mono.just("foo");
Flux
Flux能够产生0~N个元素的数据流,示例代码如下:
Flux<Integer> just = Flux.just(1, 2, 3, 4);
上述代码产生了4个元素的静态数据流。
Mono
使用Mono类,能够产生0~1个元素,示例代码:
Mono<Integer> just = Mono.just(1);
上述代码行为似乎与上节Flux一致,只是限制元素数量不超过1。
订阅流
收集元素
使用subsribe()方法收集流中的元素,代码如下:
` List elements = new ArrayList<>();
Flux.just(1, 2, 3, 4)
.log()
.subscribe(elements::add);`
只有当我们订阅了数据才会开始流动。另外还添加了日志记录,这有助于我们调试程序、查看后台发生了什么。
元素流
通过日志,可以看到完整数据流过程。
- onSubscribe() – 当订阅流时调用
- request(unbounded) – 当调用subscribe()方法, 意味着我们正创建订阅从流中请求元素。这种缺省情况为无边界的,即请求所有单个有效元素、
- onNext() – 对每个单个元素都调用
- onComplete() – 接收到最后一个元素后调用。实际上还有一个onError(),如果有异常就会调用它。
这是在Subscriber接口定义的,作为响应式流规范的一部分实现。实际上在调用onSubscribe()时在幕后实例化Subscriber。
`List elements = new ArrayList<>();
Flux.just(1, 2, 3, 4)
.log()
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(Integer integer) {
elements.add(integer);
}
@Override
public void onError(Throwable t) {}
@Override
public void onComplete() {}
});`
我们能看到Subscriber 实现种每个方面对应可能的处理阶段。Flux提供的助手方法subscribe()避免了大量冗余工作。
与Java8流对比
核心区别在于,反应式是一个推模型,而Java 8流是一个拉模型。
其他操作
反压
响应式编程一个主要目标就是要解决反压问题。如果生产者发射事件速度超过消费者处理速度,最终消费者会因为过量事件而耗尽资源。
流操作
映射数据可以使用map()、合并流可以使用zip()等
热流
现实中响应式场景更可能面对是无限流。举例,持续不断鼠标运动事件流需要响应。这种类型流称为热流,因为它们总是在运行,可以在任何时间点订阅,忽略开始数据。
创建热流的一种方式为转换冷流。
节流可以引入sample方法,并指定参数:间隔时间。还有很多策略可以减少数据推送给下游,比如窗口和缓存
并发
Scheduler接口提供了异步执行的抽象,为我们提供一些实现。可以尝试订阅在非主线程中执行。
List<Integer> elements = new CopyOnWriteArrayList<>(); Flux.just(1, 2, 3, 4) .log() .map(i -> i * 2) .subscribeOn(Schedulers.parallel()) .subscribe(elements::add);
并行调度让订阅运行在不同的线程
参考:
https://blog.csdn.net/neweastsun/article/details/127091096