新建maven项目,将Project Reactor作为依赖项添加到应用程序中:
响应流规范只定义了4个接口,即:
- Publisher<T>
- Subscriber<T>
- Subscription
- Processor<T, R>
Project Reactor提供了 Publisher<T> 接口的实现,即 Flux<T> 和 Mono<T> 。
1.2.1、Flux
上图为将Flux流转换为另一个Flux流的示例。Flux定义了一个通用的响应式流,它可以产生零个、一个或多个元素,乃至无限元素。有如下公式:
如下代码生成一个简单的无限响应流:
该流重复产生1到5的数字(1,2,3,4,5,1,2。。。)。每个元素都无需完成整个流创建即可被转换和消费。订阅者可以随时取消订阅从而将无限流转换为有限流。收集无限流发出的所有元素会导致 OutOfMemoryException 。如下代码重现该问题:
1.2.2、Mono
上图为将Mono流转换为另一个Mono流的示例。与Flux相比,Mono类型定义了一个最多可以生成一个元素的流,可以通过如下公式表示:
当应用程序API最多返回一个元素时,可以使用 Mono<T> 。它可以轻松替换 CompletableFuture<T> ,并提供相似的语义,只不过 CompletableFuture 在没有发出值的情况下无法正常完成。CompletableFuture 会立即开始处理,而Mono在订阅者出现之前什么也不做。
Mono类型不仅提供了大量的响应式操作符,还能够整合到更大的响应式工作流程中。当需要对已完成的操作通知客户端时,也可以使用Mono。此时,可以返回 Mono<Void> 类型并在处理完成时发出 onComplete() 信号,或者在发生异常时返回 onError() 。此时,我们不返回任何数据,而是发出通知信号,而该信号可以用作进一步计算的触发器。
Mono和Flux可以容易地相互“转换”。如: Flux<T>.collectList() 返回 Mono<List<T>> ,而 Mono<T>.flux() 返回 Flux<T> 。
1.2.3、RxJava 2 响应式类型
即使 RxJava 2.x 库和 Project Reactor 具有相同的基础,RxJava 2 还是有一组不同的响应式发布者。由于这两个库实现了相同的理念,包括响应式操作符、线程管理和错误处理,都非常相似。因此,或多或少熟悉其中一个库意味着同时熟悉了这两个库。
RxJava 1.x中最初只有 Observable 这一个响应式类型,之后又添加了 Single 和 Completable类型。在版本 2 中,该库具有以下响应式类型:
- Observable
- Flowable
- Single
- Maybe
- Completable
1、Observable
- 与 RxJava 1.x 的Observable语义几乎相同,但是,不接收 null 值。
- Observable 既不支持背压,也不实现 Publisher接口,所以它与响应式流规范不直接兼容。
- Observable 类型的开销小于 Flowable 类型。
- 它具有toFlowable 方法,可以通过应用用户选择的背压策略将流转换为 Flowable。
2、Flowable
- Flowable 类型是 Reactor Flux 类型的直接对应物。
- 实现了响应式流的 Publisher,可以应用在由 Project Reactor 实现的响应式工作流中,因为 API 消费 Publisher 类型的参数,而不是针对特定库的Flux 类型。
3、Single
- Single 类型表示生成且仅生成一个元素的流。
- 不继承 Publisher 接口。
- 具有 toFlowable 方法。
- 不需要背压策略。
- 相较 Reactor 中的 Mono 类型,Single 更好地表示了 CompletableFuture 的语义,但是在订阅发生之前它仍然不会开始处理。
4、Maybe
- 实现了与 Reactor 的 Mono 类型相同的语义,但是不兼容响应式流,因为 Maybe 不实现 Publisher 接口。
- 具有 toFlowable 方法,以兼容响应式流规范。
5、Completable
- 只能触发 onError 或onComplete 信号,但不能产生 onNext 信号。
- 不实现 Publisher 接口,但具有toFlowable 方法。
- 它对应不能生成 onNext 信号的 Mono<Void>类型。
总而言之,要与其他兼容响应式流的代码集成,应将 RxJava 类型转换为 Flowable 类型。
Flux 和 Mono 提供了许多工厂方法,可以根据已有的数据创建响应流。如,可以使用对象引用或集合创建 Flux,甚至可以简单地用数字范围来创建:
Mono 提供类似的工厂方法,但主要针对单个元素。它也经常与 nullable 类型和 Optional类型一起使用:
Mono 对于包装异步操作(如 HTTP 请求或数据库查询)非常有用。Mono 提供了:
- fromCallable(Callable)
- fromRunnable(Runnable)
- fromSupplier(Supplier)
- fromFuture(CompletableFuture)
- fromCompletionStage(CompletionStage)
注意,上述代码不仅异步发出 HTTP 请求(由适当的 Scheduler 提供),还会处理onError信号传播的错误。
Flux 和 Mono 都可以使用 from(Publisher<T> p) 工厂方法适配任何其他 Publisher 实例。
或者是这种写法:
两种响应式类型都提供了简便的方法来创建常用的空流以及只包含错误的流:
- empty()工厂方法,它们分别生成 Flux 或 Mono 的空实例。
- never()方法会创建一个永远不会发出完成、数据或错误等信号的流。
- error(Throwable)工厂方法创建一个序列,该序列在订阅时始终通过每个订阅者的onError(...)方法传播错误。由于错误是在 Flux 或 Mono 声明期间被创建的,因此,每个订阅者都会收到相同的 Throwable 实例。
defer 工厂方法创建一个序列,并在订阅时决定其行为,可以为不同的订阅者生成不同的数据:
总结:
- Project Reactor 只需使用 just 方法枚举元素就可以创建 Flux 和 Mono 序列。
- 可以使用 justOrEmpty 轻松地将 Optional 包装到 Mono 中,或者使用 fromSupplier 方法将Supplier 包装到 Mono 中。
- 可以使用 fromFuture 方法映射 Future,或使用 fromRunnable工厂方法映射 Runnable。
- 可以使用fromArray 或 fromIterable 方法将数组或 Iterable 集合转换为 Flux 流。
完整测试代码:
Flux 和 Mono 提供了对 subscribe()方法的基于 lambda 的重载,简化了订阅的开发。subscribe 方法的所有重载都返回 Disposable接口的实例,可以用于取消基础的订阅过程。在重载方法1到4中,订阅发出对无界数据(Long.MAX_VALUE)的请求。
注意:简单订阅请求无界数据(Long.MAX_VALUE)的选项有时可能迫使生产者完成大量工作以满足需求。因此,如果生产者更适合处理有界数据请求,建议使用订阅对象或应用请求限制操作符来控制需求。
重载方法6非常通用,但很少被用到。具体使用方式:
添加副作用的栗子:
所谓的副作用其实也就是对上游传递的元素进行额外的操作处理,即上游传递一个就处理一个。
处理正常情况:
添加异常的订阅处理:
添加完成事件的订阅处理:
添加订阅成功的处理:
手动控制订阅:
上述执行没有收到 onComplete 信号,因为订阅者在流完成之前取消了订阅。
特别注意:
- 响应式流可以由生产者完成(使用 onError 或 onComplete 信号)
- 响应式流可以由订阅者通过 Subscription 实例进行取消。
- Disposable 实例也可用于取消。
通常,Disposable实例不是由订阅者使用,而是由更上一级抽象的代码使用。如在主线程通过调用 Disposable 来取消流处理:
1.4.1、实现自定义订阅者
如果默认的 subscribe(...)方法不提供所需的多种功能,则可以实现自己的Subscriber,直接从响应式流规范实现 Subscriber 接口,并将其订阅到流,如下所示:
但是,上述定义订阅的方法是不对的。它打破了线性代码流,也容易出错。最困难的部分是需要自己管理背压并正确实现订阅者的所有 TCK 要求。在前面的示例中,打破了有关订阅验证和取消这几个 TCK 要求。建议扩展 Project Reactor 提供的 BaseSubscriber 类。在这种情况下,订阅者如下所示:
不仅可以重载 hookOnSubscribe(Subscription)方法,hookOnNext(T)方法,还可以重载hookOnError(Throwable)方法、hookOnCancel()方法、hookOnComplete()方法以及其他方法。
BaseSubscriber 类提供了 request(long)和 requestUnbounded()这些方法来对响应式流需求进行粒度控制。使用 BaseSubscriber 类,实现符合 TCK 的订阅者更为容易。
使用响应式流,除了需要能够创建和使用流,还必须能够完美地转换和操作。Project Reactor 为几乎所有所需的响应式转换提供了工具,通常,可以对库的功能特性做如下分类:
- 转换现有序列;
- 查看序列处理的方法;
- 拆分和聚合 Flux 序列;
- 处理时间;
- 同步返回数据。
1.5.1、映射响应式流元素
转换序列的最自然方式是将每个元素映射到一个新值。Flux 和 Mono 给出了 map 操作符,具有 map(Function<T,R>) 签名的方法可用于逐个处理元素。当操作符将元素的类型从 T 转变为 R 时,整个序列的类型将改变。
Mono 类的 map 操作符具有类似行为。cast(Class c) 操作符将流的元素强制转换为目标类。实现 cast(Class c) 操作符的最简单方法是使用 map() 操作符。如下Flux类源码:
index 操作符可用于枚举序列中的元素。该方法具有以下签名: Flux<Tuple2<Long,T >> index() 。
timestamp 操作符的行为与 index 操作符类似,但会添加当前时间戳而不是索引。
1.5.2、过滤响应式流
Project Reactor 包含用于过滤元素的各种操作符。
- filter:操作符仅传递满足条件的元素。
- ignoreElements:操作符返回 Mono<T> 并过滤所有元素。结果序列仅在原始序列结束后结束。也就是创建一个Mono序列,忽略作为源的Publisher中的所有元素,只产生消息。
- take(n):操作符限制所获取的元素,该方法忽略除前 n 个元素之外的所有元素。
- takeLast:仅返回流的最后一个元素。
- takeUntil(Predicate):传递一个元素直到满足某个条件。
- elementAt(n):只可用于获取序列的第 n 个元素。
- single:操作符从数据源发出单个数据项,也为空数据源发出 NoSuchElementException错误信号,或者为具有多个元素的数据源发出IndexOutOfBoundsException 信号。它不仅可以基于一定数量来获取或跳过元素,还可以通过带有Duration的 skip(Duration) 或 take(Duration) 操作符。
- takeUntilOther(Publisher) 或 skipUntilOther(Publisher):操作符,可以跳过或获取一个元素,直到某些消息从另一个流到达。
考虑如下工作流程,该工作流中,首先开始一个流的处理,然后从其他流收到特定事件之后,停止该流的处理。代码如下所示:
此时,可以启动然后停止元素处理,但只执行一次。该场景的弹珠图:
ignoreElements 用法:
take 的用法:
takeLast 的用法:
小结: take 和 takeLast 的用法区别是,前者是往前取 n 个元素;后者是从后往前取 n 个元素。
takeUntil 用法:
这里有一个Flux流,它不断发出整数,当发出的整数大于5时停止接收数据。takeUntil操作符的用途非常广泛,适用于需要在满足特定条件时停止数据处理的场景。 例如,在网络请求中,当接收到特定的响应状态码时,可以停止进一步的请求处理;在数据处理流程中,当达到某个阈值时,可以停止进一步的数据处理。
elementAt 用法:
即从指定的元素序列中获取指定索引位置的值,若索引不存在则取得设定的默认值返回。
single 用法:
1.5.3、收集响应式流
收集列表中的所有元素,并使用 Flux.collectList() 和 Flux.collectSortedList() 将结果集合处理为 Mono 流是可能的。 Flux.collectSortedList() 不仅会收集元素,还会对它们进行排序。如下代码:
请注意,收集集合中的序列元素可能耗费资源,当序列具有许多元素时这种现象尤为突出。此外,尝试在无限流上收集数据可能消耗所有可用的内存。Project Reactor 不仅可以将 Flux 元素收集到 List,还可以收集以下内容:
- 使用 collectMap 操作符的映射( Map<K,T> );
- 使用 collectMultimap 操作符的多映射( Map<K,Collection<T>> );
- Flux.collect(Collector) 操作符收集到任何实现了 java.util.stream.Collector 的数据结构。
- Flux 和 Mono 都有 repeat() 方法和 repeat(times) 方法,这两种方法可以针对传入序列进行循环操作。
- defaultIfEmpty(T) 是另一个简洁的方法,它能为空的 Flux 或 Mono 提供默认值。
- Flux.distinct() 仅传递之前未在流中遇到过的元素。但是,因为此方法会跟踪所有唯一性元素,所以(尤其是涉及高基数数据流时)请谨慎使用。distinct 方法具有重载方法,可以为重复跟踪提供自定义算法。因此,有时可以手动优化 distinct 操作符的资源使用。Flux.distinctUntilChanged() 操作符没有此限制,可用于无限流以删除出现在不间断行中的重复项。
注:高基数(high-cardinality)是指具有非常罕见元素或唯一性元素的数据。例如,身份编号和用户名就是典型的高基数数据,而枚举值或来自小型固定字典的值就不是。
collectMap 操作符的使用:
collectMultimap 的使用:
repeat 操作符的使用:
defaultIfEmpty 操作符的使用:
distinct 操作符的使用:
distinctUntilChanged 操作符的使用:
1.5.4、裁剪流中的元素
- 统计流中元素的数量;
- 检查所有元素是否具有 Flux.all(Predicate) 所需的属性;
- 使用 Flux.any(Predicate) 操作符检查是否至少有一个元素具有所需属性;
- 使用 hasElements 操作符检查流中是否包含多个元素;
- 使用 hasElement 操作符检查流中是否包含某个所需的元素。短路逻辑,在元素与值匹配时立即返回true。
- any 操作符不仅可以检查元素的相等性,还可以通过提供自定义 Predicate 实例来检查任何其他属性。
检查序列中是否包含偶数:
sort 操作符:
sort 操作符在后台对元素进行排序,然后在原始序列完成后发出已排序的序列。
reduce 操作符:
Flux 类能使用自定义逻辑来裁剪序列(也称为折叠)。 reduce 操作符通常需要一个初始值和一个函数,而该函数会将前一步的结果与当前步的元素组合在一起。reduce 操作符只生成一个具有最终结果的元素。举例将 1 到 5 之间的整数加起来:
scan 操作符:
Flux.scan()操作符在进行聚合时,可以向下游发送中间结果。scan 操作符对 1 到 5 之间的整数求和:
scan 操作符对于许多需要获取处理中事件的相关信息的应用程序有用。例如,我们可以计算流上的移动平均值:
then、thenMany 和 thenEmpty
Mono 和 Flux 流有 then、thenMany 和 thenEmpty 操作符,它们在上游流完成时完成。上游流完成处理后,这些操作符可用于触发新流,订阅是对于新流的。
即使 1、2 和 3 是由流生成和处理的,subscribe 方法中的 lambda 也只接收 4、5 和 6。
1.5.5、组合响应式流
Project Reactor 可以将许多传入流组合成一个传出流。指定的操作符虽然有许多重载方法,但是都会执行以下转换。
- concat 操作符通过向下游转发接收的元素来连接所有数据源。当操作符连接两个流时,它首先消费并重新发送第一个流的所有元素,然后对第二个流执行相同的操作(先后)。
- merge 操作符将来自上游序列的数据合并到一个下游序列中。与 concat 操作符不同,上游数据源是立即(同时)被订阅的。
- zip 操作符订阅所有上游,等待所有数据源发出一个元素,然后将接收到的元素组合到一个输出元素中。
- combineLatest 操作符与 zip 操作符的工作方式类似。但是,只要至少一个上游数据源发出一个值,它就会生成一个新值。
concat 操作符的使用:
merge 操作符的使用:
zip 操作符的使用:
combineLatest 操作符的使用:
1.5.6、流元素批处理
Project Reactor 支持以以下几种方式对流元素( Flux<T> )执行批处理。
- 将元素缓冲(buffering)到容器(如 List)中,结果流的类型为 Flux<List<T>> 。
- 通过开窗(windowing)方式将元素加入诸如 Flux<Flux<T>> 等流中。请注意,现在的流信号不是值,而是可以处理的子流。
- 通过某些键将元素分组(grouping)到具有 Flux<GroupedFlux<K, T>> 类型的流中。每个新键都会触发一个新的 GroupedFlux 实例,并且具有该键的所有元素都将被推送到GroupFlux 类的该实例中。
可以基于以下场景进行缓冲和开窗操作:
- 处理元素的数量,比方说每 10 个元素;
- 一段时间,比方说每 5 分钟一次;
- 基于一些谓语,比方说在每个新的偶数之前切割;
- 基于来自其他 Flux 的一个事件,该事件控制着执行过程。
buffer 操作符:
如,为列表(大小为 5)中的整数元素执行缓冲操作:
buffer 操作符将许多事件收集到一个事件集合中。该集合本身成为下游操作符的事件。当需要使用元素集合来生成一些请求,而不是使用仅包含一个元素的集合来生成许多小请求时,用缓冲操作符来实现批处理会比较方便。如,可以将数据项缓冲几秒钟然后批量插入,而不是逐个将元素插入数据库。
window 操作符:
如果需要根据数字序列中的元素是否为素数进行开窗拆分,可以使用 window 操作符的变体windowUntil。它使用谓词来确定何时创建新切片。代码如下所示:
请注意第一个窗口为空。这是因为一旦启动原始流,就会生成一个初始窗口。然后,第一个元素会到达(数字 101),它是素数,会触发一个新窗口。因此,已经打开的窗口会在没有任何元素的情况下通过 onComplete 信号关闭。
window操作符和buffer操作符类似,后者仅在缓冲区关闭时才会发出集合,而 window 操作符会在事件到达时立即对其进行传播,以更快地做出响应并实现更复杂的工作流程。
groupBy 操作符:
groupBy 操作符通过某些条件对响应式流中的元素进行分组。通过对每个元素打一个标签(key),按照标签将元素进行分组。如:将整数序列按照奇数和偶数进行分组,并仅跟踪每组中的最后两个元素。代码如下所示:
1.5.7、flatMap、concatMap 和 flatMapSequential 操作符
flatMap 操作符在逻辑上由 map 和 flatten(就 Reactor 而言,flatten 类似于 merge 操作符)这两个操作组成。flatMap 操作符的 map 部分将传入的每个元素转换为响应式流(T -> Flux<R>);flatten 部分将所有生成的响应式流合并为一个新的响应式流,通过该流可以传递 R 类型的元素。
Project Reactor 提供了 flatMap 操作符的一些不同变体。除了重载,该库还提供了flatMapSequential 操作符和 concatMap 操作符。这 3 个操作符在以下几个方面有所不同。
- 操作符是否立即订阅其内部流:
flatMap 操作符和 flatMapSequential 操作符会立即订阅,而 concatMap 操作符则会在生成下一个子流并订阅它之前等待每个内部完成。
- 操作符是否保留生成元素的顺序:
concatMap 天生保留与源元素相同的顺序,flatMapSequential 操作符通过对所接收的元素进行排序来保留顺序,而 flatMap 操作符不一定保留原始排序。
- 操作符是否允许对来自不同子流的元素进行交错:
flatMap 操作符允许交错,而 concatMap和 flatMapSequential 不允许交错。
flatMap 操作符(及其变体)在函数式编程和响应式编程中都非常重要,因为它能使用一行代码实现复杂的工作流。
flatMap 操作符:
concatMap 操作符:
concatMap对每个上游的元素,在接收后都立即生成新的流,新流每个元素处理完之后,进行下一个新流的处理。
flatMapSequential 操作符的使用:
1.5.8、元素采样
对于高吞吐量场景而言,通过应用采样技术处理一小部分事件是有意义的。sample 操作符和 sampleTimeout 操作符可以让流周期性地发出与时间窗口内最近看到的值相对应的数据项。我们假设使用以下代码:
sample API:
这里使我们每10毫秒都顺序生成数据项,订阅者也只会收到所指定的约束条件内的一小部分事件。通过这种方法,我们可以在不需要所有传入事件就能成功操作的场景下使用被动限速。流控。
sampleTimeout API:
1.5.9、响应式流转阻塞结构
Project Reactor 库提供了一个 API,用于将响应式流转换为阻塞结构。有以下选项来阻塞流并同步生成结果:
- toIterable: 方法将响应式 Flux 转换为阻塞 Iterable。
- toStream: 方法将响应式 Flux 转换为阻塞 Stream API。从 Reactor 3.2 开始,在底层使用toIterable 方法。
- blockFirst: 方法阻塞了当前线程,直到上游发出第一个值或完成流为止。
- blockLast: 方法阻塞当前线程,直到上游发出最后一个值或完成流为止。在 onError的情况下,它会在被阻塞的线程中抛出异常。
blockFirst 操作符和 blockLast 操作符具有方法重载,可用于设置线程阻塞的持续时间。这应该可以防止线程被无限阻塞。
toIterable 和 toStream 方法能够使用 Queue 来存储事件,这些事件可能比客户端代码阻塞Iterable 或 Stream 更快到达。微批处理。
1.5.10、在序列处理时查看元素
有时,我们需要对处理管道中的每个元素或特定信号执行操作。为满足此类要求,Project Reactor提供了以下方法。
- doOnNext(Consumer<T>): 使我们能对 Flux 或 Mono 上的每个元素执行一些操作
- doOnComplete 和 doOnError(Throwable): 可以应用在相应的事件上
- doOnSubscribe(Consumer<Subscription>) 、 doOnRequest(LongConsumer) 和 doOnCancel(Runnable): 使我们能对订阅生命周期事件做出响应
- 无论是什么原因导致的流终止, doOnTerminate(Runnable) 都会在流终止时被调用
此外,Flux 和 Mono 提供了 doOnEach(Consumer<Signal>) 方法,该方法处理表示响应式流领域的所有信号,包括 onError、onSubscribe、onNext、onError 和 onComplete。如下示例:
1.5.11、物话与非物化信号
将流中的元素封装为Signal对象进行处理。有时,采用信号进行流处理比采用数据进行流处理更有用。为了将数据流转换为信号流并再次返回,Flux 和 Mono 提供了 materialize 方法和 dematerialize 方法。示例如下:
这里,在处理信号流时,doOnNext 方法不仅接收带有数据的 onNext 事件,还接收包含在Signal类中的 onComplete 事件。此方法能采用一个类型层次结构来处理 onNext、onError和 onCompete 事件。
如果我们只需要记录信号而不修改它们,那么 Reactor 提供了 log 方法,该方法使用可用的记录器记录所有处理过的信号。
有时候需要一种更复杂的方法来在流中生成信号,或将对象的生命周期绑定到响应式流的生命周期。
1.6.1、push 和 create 工厂方法
push 工厂方法能通过适配一个单线程生产者来编程创建 Flux 实例。此方法对于适配异步、单线程、多值 API 非常有用,而无须关注背压和取消, push 方法本身包含背压和取消。如下代码示例:
push 工厂方法可以很方便地使用默认的背压和取消策略来适配异步 API。
create 方法:
create 工厂方法,与 push 工厂方法类似,起到桥接的作用。该方法能从不同的线程发送事件。如下代码所示:
1.6.2、generate 工厂方法
generate 工厂方法旨在基于生成器的内部处理状态创建复杂序列。它需要一个初始值和一个函数,该函数根据前一个内部状态计算下一个状态,并将 onNext 信号发送给下游订阅者。例如,创建一个简单的响应式流来生成斐波那契(Fibonacci)数列(1,1,2,3,5,8,13,…)。
lambda 形式:
二元组参数示例:
在下一个值生成之前,每个新值都被同步传播给订阅者。当生成不同的复杂响应式流,而该序列需要保持发射之间的中间状态时,该方法非常有用。
1.6.3、using 用法
using 工厂方法能根据一个 disposable 资源创建流,是用于管理资源的函数。它在响应式编程中实现了 try-with-resources 方法。假设我们需要包装一个阻塞 API,而该 API 使用以下有如下表示:
拓展:这里特别说明下关于 try 的不同。这里是将语法从:try{}catch{}finaliy{} 变成了 try(){}catch{},凡是在try的括号中声明的类都必须实现java.io.Closeable接口,这样try就会自动将声明的流在使用完毕后自动关闭。
而使用响应式的编程实现同样的效果,如下代码所示:
1.6.4、usingWhen 用法
与 using 操作符类似,usingWhen 操作符使我们能以响应式方式管理资源。但是,using 操作符会同步获取受托管资源(通过调用 Callable 实例)。同时,usingWhen 操作符响应式地获取受托管资源(通过订阅 Publisher 的实例)。此外,usingWhen 操作符接受不同的处理程序,以便应对主处理流终止的成功和失败。这些处理程序由发布者实现。
可以仅使用 usingWhen 一个操作符实现完全无阻塞的响应式事务。假设我们有一个完全响应式的事务。出于演示目的,代码做了简化处理。响应式事务实现如下所示:
Lambda形式:
使用 usingWhen 操作符,不仅可以更容易地以完全响应式的方式管理资源生命周期,还可以轻松实现响应式事务。因此,与 using 操作符相比,usingWhen 操作符有巨大改进。
onError 信号是响应式流规范的一个组成部分,一种将异常传播给可以处理它的用户。但是,如果最终订阅者没有为 onError 信号定义处理程序,那么 onError 抛异常。
此外,响应式流的语义定义了 onError 是一个终止操作,该操作之后响应式流会停止执行。此时,我们可能采取以下策略中的一种做出不同响应:
- 为 subscribe 操作符中的 onError 信号定义处理程序。
- 通过 onErrorReturn 操作符捕获一个错误,并用一个默认静态值或一个从异常中计算出的值替换它。
- 通过 onErrorResume 操作符捕获异常并执行备用工作流。
- 通过 onErrorMap 操作符捕获异常并将其转换为另一个异常来更好地表现当前场景。
- 定义一个在发生错误时重新执行的响应式工作流。如果源响应序列发出错误信号,那么retry 操作符会重新订阅该序列。
假设有如下推荐服务,该服务是不可靠的:
总而言之,Project Reactor 提供了丰富的工具集,可以帮助处理异常情况,从而提高应用程序的回弹性。
尽管响应式流规范要求将背压构建到生产者和消费者之间的通信中,但这仍然可能使消费者溢出。一些消费者可能无意识地请求无界需求,然后无法处理生成的负载。
另一些消费者则可能对传入消息的速率有严格的限制。例如,数据库客户端每秒不能插入超过 1000 条记录。在这种情况下,事件批处理技术可能有所帮助。此时可以通过以下方式配置流以处理背压情况:
- onBackPressureBuffer: 操作符会请求无界需求并将返回的元素推送到下游。如果下游消费者无法跟上,那么元素将缓冲在队列中。
- onBackPressureDrop: 操作符也请求无界需求(Integer.MAX_VALUE)并向下游推送数据。如果下游请求数量不足,那么元素会被丢弃。自定义处理程序可以用来处理已丢弃的元素。
- onBackPressureLast :操作符与 onBackPressureDrop 的工作方式类似。只是会记住最近收到的元素,并在需求出现时立即将其推向下游。
- onBackPressureError: 操作符在尝试向下游推送数据时请求无界需求。如果下游消费者无法跟上,则操作符会引发错误。
管理背压的另一种方法是使用速率限制技术。limitRate(n) 操作符将下游需求拆分为不大于 n的较小批次。可以保护脆弱的生产者免受来自下游消费者的不合理数据请求的破坏。 limitRate(n) 操作符会限制来自下游消费者的需求(总请求值)。
如, limitRequest(100) 确保不会向生产者请求超过 100 个元素。发送 100 个事件后,操作符成功关闭流。
onBackpressureBuffer 操作符:
onBackpressureDrop 操作符:
onBackpressureLast 操作符:
onBackpressureError 操作符:
冷发布者行为方式:无论订阅者何时出现,都为该订阅者生成所有序列数据,没有订阅者就不会生成数据。以下代码表示冷发布者的行为:
每当订阅者出现时都会有一个新序列生成,而这些语义可以代表 HTTP 请求。热发布者中的数据生成不依赖于订阅者而存在。因此,热发布者可能在第一个订阅者出现之前开始生成元素。
这种语义代表数据广播场景。例如,一旦股价发生变化,热发布者就可以向其订阅者广播有关当前股价的更新。
但是,当订阅者到达时,它仅接收未来的价格更新,而不接受先前价格历史。Reactor 库中的大多数热发布者扩展了 Processor 接口。但是,just 工厂方法会生成一个热发布者,因为它的值只在构建发布者时计算一次,并且在新订阅者到达时不会重新计算。
可以通过将 just 包装在 defer 中来将其转换为冷发行者。这样,即使 just 在初始化时生成值,这种初始化也只会在新订阅出现时发生。后一种行为由 defer 工厂方法决定。
1.9.1、多播流元素(🍕重点)
通过响应式转换将冷发布者转变为热发布者。如,一旦所有订阅者都准备好生成数据,希望在几个订阅者之间共享冷处理器的结果。同时,我们又不希望为每个订阅者重新生成数据。Project Reactor为此目的提供了 ConnectableFlux 。
ConnectableFlux ,不仅可以生成数据以满足最急迫的需求,还会缓存数据,以便所有其他订阅者可以按照自己的速度处理数据。队列和超时的大小可以通过类的 publish 方法和 replay 方法进行配置。
此外, ConnectableFlux 可以使用 connect 、 autoConnect(n) 、 refCount(n) 和 refCount(int,Duration) 等方法自动跟踪下游订阅者的数量,以便在达到所需阈值时触发执行操作。如下案例:
可以看到,冷发布者收到了订阅,只生成了一次数据项。但是,两个订阅者都收到了整个事件集合。
1.9.2、缓存流元素
使用 ConnectableFlux 可以轻松实现不同的数据缓存策略。但是,Reactor 已经以 cache 操作符的形式提供了用于事件缓存的 API。
cache 操作符使用 ConnectableFlux ,因此它的主要附加值是它所提供的一个流式而直接的API。可以调整缓存所能容纳的数据量以及每个缓存项的到期时间。示例代码:
前两个订阅者共享第一个订阅的同一份缓存数据。然后,在一定延迟之后,由于第三个订阅者无法获取缓存数据,因此一个针对冷发布者的新订阅被触发了。最后,即使该数据不来自缓存,第三个订阅者也接收到了所需的数据。
1.9.3、共享流元素
我们可以使用 ConnectableFlux 向几个订阅者多播事件。但是需要等待订阅者出现才能开始处理。share 操作符可以将冷发布者转变为热发布者。该操作符会为每个新订阅者传播订阅者尚未错过的事件。
在前面的代码中,共享了一个冷发布流,该流以每 100 毫秒为间隔生成事件。然后,经过一些延迟,一些订阅者订阅了共享发布者。第一个订阅者从第一个事件开始接收,而第二个订阅者错过了在其出现之前所产生的事件(S2 仅接收到事件 3 和事件 4)。
响应式编程是异步的,因此它本身就假定存在时序。基于 Project Reactor,可以使用 interval 操作符生成基于一定持续时间的事件,使用 delayElements 操作符生成延迟元素,并使用 delaySequence 操作符延迟所有信号。
Reactor 的 API 使你能对一些与时间相关的事件做出响应, timestamp 操作符用于输出元素的时间戳, timeout 操作符用于指定消息时间间隔的大小。与 timestamp 类似, elapsed 操作符测量与上一个事件的时间间隔。
interval 操作符:
delayElements 操作符:
delaySequence 操作符:
timeout 操作符:
timestamp 操作符:
elapsed 操作符:
从前面的输出中可以明显看出,事件并未恰好在 300 毫秒的时间间隔内到达。发生这种情况是因为 Reactor 使用 Java 的 ScheduledExecutorService 进行调度事件,而这些事件本身并不能保证精确的延迟。因此,应该注意不要在 Reactor 库中要求太精确的时间(实时)间隔。
当我们构建复杂的响应式工作流时,通常需要在几个不同的地方使用相同的操作符序列。transform 操作符,可以将这些常见的部分提取到单独的对象中,并在需要时重用它们。transform 操作符,可以增强流结构本身。示例代码:
transform 操作符仅在流生命周期的组装阶段更新一次流行为,可以在响应式应用程序中实现代码重用。
响应式流规范定义了 Processor 接口。Processor 既是 Publisher 也是 Subscriber。因此,既可以订阅 Processor 实例,也可以手动向它发送信号(onNext、onError 和 onComplete)。Reactor 的作者建议忽略处理器,因为它们很难使用并且容易出错。
在大多数情况下,处理器可以被操作符的组合所取代。另外,生成器工厂方法(push、create 和 generate)可能更适合适配外部 API。Reactor 提出以下几种处理器:
- Direct :处理器只能通过操作处理器的接收器来推送因用户手动操作而产生的数据。
- DirectProcessor 和 UnicastProcessor 是这组处理器的代表。
- DirectProcessor 不处理背压,可用于向多个订阅者发布事件。
- UnicastProcessor 使用内部队列处理背压,最多只能为一个 Subscriber 服务。
-
Synchronous 处理器
-
EmitterProcessor 和 ReplayProcessor 可以同时通过手动方式和订阅上游 Publisher 的方式来推送数据。
-
EmitterProcessor 可以为多个订阅者提供服务并满足它们的需求,但仅能以同步方式消费由单一 Publisher 产生的数据。
-
ReplayProcessor 的行为类似于 EmitterProcessor ,但是它能使用几种策略来缓存传入的数据。
-
-
Asynchronous 处理器
-
WorkQueueProcessor 和 TopicProcessor 可以推送从多个上游发布者处获得的下游数据。
-
为了处理多个上游发布者,这些处理器使用 RingBuffer数据结构。这些处理器具有专用的构建器 API,因为配置选项的数量使它们很难初始化。
-
TopicProcessor 兼容响应式流,并可以为每个下游 Subscriber 关联一个 Thread 来处理交互。它可以服务的下游订阅者数量有限。
-
WorkQueueProcessor 具有与 TopicProcessor 类似的特性。但是,它放宽了一些响应式流要求,这使它在运行时所使用的资源更少。
-
Reactor 库附带了一个通用的测试框架。 io.projectreactor:reactor-test 库提供了测试Project Reactor 所实现的响应式工作流所需的所有必要工具。
虽然响应式代码不那么容易调试,但是 Project Reactor 提供了能在需要时简化调试过程的技术。与任何基于回调的框架一样,Project Reactor 中的栈跟踪信息量不大。它们没有在代码中给出发生异常情况的确切位置。Reactor 库具有面向调试的组装时检测功能,可以使用以下代码激活:
示例程序:
配置Hooks:
没配置:
启用后,此功能开始收集将要组装的所有流的栈跟踪,稍后此信息可以基于组装信息扩展栈跟踪信息,从而帮助我们更快地发现问题。但是,创建栈跟踪的过程成本很高。因此,作为最后的手段,它应该只以受控的方式进行激活。
此外,Project Reactor 的 Flux 和 Mono 类型提供了一个被称为 log 的便捷方法。它能记录使用操作符的所有信号。即使在调试情况下,许多方法的自定义实现也可以提供足够的自由度来跟踪所需的数据。如下代码:
Project Reactor 是一个通用且功能丰富的库。但是,它无法容纳所有有用的响应式工具。因此,有一些项目在一些领域扩展了 Reactor 的功能。官方的 Reactor 插件项目为 Reactor 项目提供了几个模块。
reactor-adapter 模块为 RxJava 2 响应式类型和调度程序提供桥接。此外,该模块还能与Akka 进行集成。
reactor-logback 模块提供高速异步日志记录功能。它以 Logback 的 AsyncAppender和 LMAXDisruptor 的 RingBuffer 为基础,其中后者通过 Reactor 的 Processor 实现。
reactor-extra 模块包含用于高级需求的其他实用程序。例如,该模块包含 TupleUtils类,该类简化了编写 Tuple 类的代码。此外,该模块具有 MathFlux类,可以从数字源中计算最小值和最大值,并对它们求和或取平均。 ForkJoinPoolScheduler 类使 Java 的 ForkJoinPool 适配 Reactor 的Scheduler。可以使用以下导入方式将模块添加到项目中:
此外,Project Reactor 生态系统还为流行的异步框架和消息代理服务器提供了响应式驱动程序。Reactor RabbitMQ 模块使用熟悉的 Reactor API 为 RabbitMQ 提供了一个响应式 Java 客户端。该模块不仅提供具有背压支持的异步非阻塞消息传递,还使应用程序能够通过使用 Flux和 Mono 类型将 RabbitMQ 用作消息总线。
Reactor Kafka 模块为 Kafka 消息代理服务器提供了类似的功能。
另一个广受欢迎的 Reactor 扩展被称为 Reactor Netty。它使用 Reactor 的响应式类型来适配Netty的 TCP/HTTP/UDP 客户端和服务器。Spring WebFlux 模块在内部使用 Reactor Netty 来构建非阻塞式Web 应用程序。
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
如需转载请保留出处:https://bianchenghao.cn/bian-cheng-ri-ji/76310.html