Mono和Flux的用法详解
WebFlux的Flux和Mono用法Fluxjust可以指定序列中包含的全部元素。创建出来的 Flux 序列在发布这些元素之后会自动结束。fromArray(),fromIterable()和 fromStream()可以从一个数组、Iterable 对象或 Stream 对象中创建 Flux 对象。empty()创建一个不包含任何元素,只发布结束消息的序列,在响应式编程中,流的...
WebFlux的Flux和Mono用法
Flux
-
just
- 可以指定序列中包含的全部元素。创建出来的 Flux 序列在发布这些元素之后会自动结束。 fromArray(),fromIterable()和 fromStream()
- 可以从一个数组、Iterable 对象或 Stream 对象中创建 Flux 对象。 empty()
- 创建一个不包含任何元素,只发布结束消息的序列,在响应式编程中,流的传递是基于元素的,empty表示没有任何元素,所以不会进行后续传递,需要用switchIfEmpty等处理 error(Throwable error)
- 创建一个只包含错误消息的序列。 never()
- 创建一个不包含任何消息通知的序列。使用示例:
Flux.range(1, 10)
.timeout(Flux.never(), v -> Flux.never())
.subscribe(System.out::println);
上面表示用不超时
-
range(int start, int count)
- 创建包含从 start 起始的 count 个数量的 Integer 对象的序列。示例:
Flux.interval(Duration.ofSeconds(2)).doOnNext(System.out::println).blockLast();
-
intervalMillis(long period)和 intervalMillis(long delay, long period)
- 与 interval()方法的作用相同,只不过该方法通过毫秒数来指定时间间隔和延迟时间。 generate()
-
generate()方法通过同步和逐一的方式来产生 Flux 序列。序列的产生是通过调用所提供的 SynchronousSink 对象的 next(),complete()和 error(Throwable)方法来完成的。逐一生成的含义是在具体的生成逻辑中,next()方法只能最多被调用一次。在有些情况下,序列的生成可能是有状态的,需要用到某些状态对象。此时可以使用 generate()方法的另外一种形式 generate(Callable stateSupplier, BiFunction<S,SynchronousSink,S> generator),其中 stateSupplier 用来提供初始的状态对象。在进行序列生成时,状态对象会作为 generator 使用的第一个参数传入,可以在对应的逻辑中对该状态对象进行修改以供下一次生成时使用。
在代码清单 2中,第一个序列的生成逻辑中通过 next()方法产生一个简单的值,然后通过 complete()方法来结束该序列。如果不调用 complete()方法,所产生的是一个无限序列。第二个序列的生成逻辑中的状态对象是一个 ArrayList 对象。实际产生的值是一个随机数。产生的随机数被添加到 ArrayList 中。当产生了 10 个数时,通过 complete()方法来结束序列
Flux.generate(sink -> {
sink.next("Hello");
sink.complete();
}).subscribe(System.out::println);
final Random random = new Random();
Flux.generate(ArrayList::new, (list, sink) -> {
int value = random.nextInt(100);
list.add(value);
sink.next(value);
if (list.size() == 10) {
sink.complete();
}
return list;
}).subscribe(System.out::println);
-
create()方法
- create()方法与 generate()方法的不同之处在于所使用的是 FluxSink 对象。FluxSink 支持同步和异步的消息产生,并且可以在一次调用中产生多个元素。在代码清单 3 中,在一次调用中就产生了全部的 10 个元素。
Flux.create(sink -> {
for (int i = 0; i < 10; i++) {
sink.next(i);
}
sink.complete();
}).subscribe(System.out::println);
Mono
-
just
- 创建对象 empty
- 创建一个不包含任何元素,只发布结束消息的序列 error()
- 抛出异常,使用示例:
Mono.defer(()->{
return Mono.error(new RuntimeException());
}).subscribe();
-
never()
- empty里面至少还有一个结束消息,而never则是真的啥都没有 fromCallable()
- 使用示例:
Mono.fromCallable(() -> "9999").subscribe(System.out::println);
-
fromCompletionStage()
- 示例:
Mono.fromCompletionStage(future).block();
-
fromFuture()、fromRunnable()和 fromSupplier()
- 分别从 Callable、CompletionStage、CompletableFuture、Runnable 和 Supplier 中创建 Mono。 delay(Duration duration)和 delayMillis(long duration)
- 创建一个 Mono 序列,在指定的延迟时间之后,产生数字 0 作为唯一值。
Mono.delay(Duration.ofSeconds(3)).doOnNext(System.out::println).block();
-
justOrEmpty(Optional<? extends T> data)和 justOrEmpty(T data)
- 从一个 Optional 对象或可能为 null 的对象中创建 Mono。只有 Optional 对象中包含值或对象不为 null 时,Mono 序列才产生对应的元素。
还可以通过 create()方法来使用 MonoSink 来创建 Mono
Mono.fromSupplier(() -> "Hello").subscribe(System.out::println);
Mono.justOrEmpty(Optional.of("Hello")).subscribe(System.out::println);
Mono.create(sink -> sink.success("Hello")).subscribe(System.out::println);
操作符
-
buffer 和 bufferTimeout
-
这两个操作符的作用是把当前流中的元素收集到集合中,并把集合对象作为流中的新元素。在进行收集时可以指定不同的条件:所包含的元素的最大数量和收集的时间。方法 buffer()仅使用一个条件,而 bufferTimeout()可以同时指定两个条件。指定时间间隔时可以使用 Duration 对象或毫秒数,即使用 bufferMillis()或 bufferTimeoutMillis()两个方法。
除了元素数量和时间之外,还可以通过 bufferUntil 和 bufferWhile 操作符来进行收集。这两个操作符的参数是表示每个集合中的元素所要满足的条件的 Predicate 对象。bufferUntil 会一直收集直到 Predicate 返回为 true。使得 Predicate 返回 true 的那个元素可以选择添加到当前集合或下一个集合中;bufferWhile 则只有当 Predicate 返回 true 时才会收集。一旦值为 false,会立即开始下一次收集
需要注意的是,在第二个案例中,首先通过 toStream()方法把 Flux 序列转换成 Java 8 中的 Stream 对象,再通过 forEach()方法来进行输出。这是因为序列的生成是异步的,而转换成 Stream 对象可以保证主线程在序列生成完成之前不会退出,从而可以正确地输出序列中的所有元素。
Flux.range(1, 100).buffer(20).subscribe(System.out::println);
Flux.intervalMillis(100).bufferMillis(1001).take(2).toStream().forEach(System.out::println);
Flux.range(1, 10).bufferUntil(i -> i % 2 == 0).subscribe(System.out::println);
Flux.range(1, 10).bufferWhile(i -> i % 2 == 0).subscribe(System.out::println);
-
window
- window 操作符的作用类似于 buffer,所不同的是 window 操作符是把当前流中的元素收集到另外的 Flux 序列中,因此返回值类型是 Flux<Flux>。在代码清单 7 中,两行语句的输出结果分别是 5 个和 2 个 UnicastProcessor 字符。这是因为 window 操作符所产生的流中包含的是 UnicastProcessor 类的对象,而 UnicastProcessor 类的 toString 方法输出的就是 UnicastProcessor 字符。
Flux.range(1, 100).window(20).subscribe(System.out::println);
Flux.intervalMillis(100).windowMillis(1001).take(2).toStream().forEach(System.out::println);
-
zipWith
-
zipWith 操作符把当前流中的元素与另外一个流中的元素按照一对一的方式进行合并。在合并时可以不做任何处理,由此得到的是一个元素类型为 Tuple2 的流;也可以通过一个 BiFunction 函数对合并的元素进行处理,所得到的流的元素类型为该函数的返回值。
在代码清单 8 中,两个流中包含的元素分别是 a,b 和 c,d。第一个 zipWith 操作符没有使用合并函数,因此结果流中的元素类型为 Tuple2;第二个 zipWith 操作通过合并函数把元素类型变为 String。
Flux.just("a", "b")
.zipWith(Flux.just("c", "d"))
.subscribe(System.out::println);
Flux.just("a", "b")
.zipWith(Flux.just("c", "d"), (s1, s2) -> String.format("%s-%s", s1, s2))
.subscribe(System.out::println);
-
take
- take 系列操作符用来从当前流中提取元素。提取的方式可以有很多种。
方法 | 注释 |
---|---|
take(long n),take(Duration timespan)和 takeMillis(long timespan) | 按照指定的数量或时间间隔来提取 |
takeLast(long n) | 提取流中的最后 N 个元素 |
takeUntil(Predicate<? super T> predicate) | 提取元素直到 Predicate 返回 true |
takeWhile(Predicate<? super T> continuePredicate) | 当 Predicate 返回 true 时才进行提取 |
takeUntilOther(Publisher<?> other) | 提取元素直到另外一个流开始产生元素 |
Flux.range(1, 1000).take(10).subscribe(System.out::println);
Flux.range(1, 1000).takeLast(10).subscribe(System.out::println);
Flux.range(1, 1000).takeWhile(i -> i < 10).subscribe(System.out::println);
Flux.range(1, 1000).takeUntil(i -> i == 10).subscribe(System.out::println);
-
merge 和 mergeSequential
-
merge 和 mergeSequential 操作符用来把多个流合并成一个 Flux 序列。不同之处在于 merge 按照所有流中元素的实际产生顺序来合并,而 mergeSequential 则按照所有流被订阅的顺序,以流为单位进行合并。
代码清单 11 中分别使用了 merge 和 mergeSequential 操作符。进行合并的流都是每隔 100 毫秒产生一个元素,不过第二个流中的每个元素的产生都比第一个流要延迟 50 毫秒。在使用 merge 的结果流中,来自两个流的元素是按照时间顺序交织在一起;而使用 mergeSequential 的结果流则是首先产生第一个流中的全部元素,再产生第二个流中的全部元素。
Flux.merge(Flux.intervalMillis(0, 100).take(5), Flux.intervalMillis(50, 100).take(5))
.toStream()
.forEach(System.out::println);
Flux.mergeSequential(Flux.intervalMillis(0, 100).take(5), Flux.intervalMillis(50, 100).take(5))
.toStream()
.forEach(System.out::println);
-
flatMap 和 flatMapSequential
- flatMap 和 flatMapSequential 操作符把流中的每个元素转换成一个流,再把所有流中的元素进行合并。flatMapSequential 和 flatMap 之间的区别与 mergeSequential 和 merge 之间的区别是一样的。
Flux.just(5, 10)
.flatMap(x -> Flux.intervalMillis(x * 10, 100).take(x))
.toStream()
.forEach(System.out::println);
测试
在对使用 Reactor 的代码进行测试时,需要用到 io.projectreactor:reactor-test 库。
-
使用 StepVerifier
- 进行测试时的一个典型的场景是对于一个序列,验证其中所包含的元素是否符合预期。StepVerifier 的作用是可以对序列中包含的元素进行逐一验证。在代码清单 21 中,需要验证的流中包含 a 和 b 两个元素。通过 StepVerifier.create()方法对一个流进行包装之后再进行验证。expectNext()方法用来声明测试时所期待的流中的下一个元素的值,而 verifyComplete()方法则验证流是否正常结束。类似的方法还有 verifyError()来验证流由于错误而终止。
StepVerifier.create(Flux.just("a", "b"))
.expectNext("a")
.expectNext("b")
.verifyComplete();
调试
由于反应式编程范式与传统编程范式的差异性,使用 Reactor 编写的代码在出现问题时比较难进行调试。为了更好的帮助开发人员进行调试,Reactor 提供了相应的辅助功能。
启用调试模式
Hooks类中有许多方法可以进行调试
使用检查点
另外一种做法是通过 checkpoint 操作符来对特定的流处理链来启用调试模式。代码清单 25 中,在 map 操作符之后添加了一个名为 test 的检查点。当出现错误时,检查点名称会出现在异常堆栈信息中。对于程序中重要或者复杂的流处理链,可以在关键的位置上启用检查点来帮助定位可能存在的问题。
Flux.just(1, 0).map(x -> 1 / x).checkpoint("test").subscribe(System.out::println);
“冷”与“热”序列
之前的代码清单中所创建的都是冷序列。冷序列的含义是不论订阅者在何时订阅该序列,总是能收到序列中产生的全部消息。而与之对应的热序列,则是在持续不断地产生消息,订阅者只能获取到在其订阅之后产生的消息。
在代码清单 28 中,原始的序列中包含 10 个间隔为 1 秒的元素。通过 publish()方法把一个 Flux 对象转换成 ConnectableFlux 对象。方法 autoConnect()的作用是当 ConnectableFlux 对象有一个订阅者时就开始产生消息。代码 source.subscribe()的作用是订阅该 ConnectableFlux 对象,让其开始产生数据。接着当前线程睡眠 5 秒钟,第二个订阅者此时只能获得到该序列中的后 5 个元素,因此所输出的是数字 5 到 9。
final Flux<Long> source = Flux.intervalMillis(1000)
.take(10)
.publish()
.autoConnect();
source.subscribe();
Thread.sleep(5000);
source
.toStream()
.forEach(System.out::println);
参考资源
更多推荐
所有评论(0)