前言
前两天面试的时候,面试官问我:一个ip发请求过来,是一个ip对应一个线程吗?我突然愣住了,对于SpringBoot如何处理请求好像从来没仔细思考过,所以面试结束后就仔细研究了一番,现在就来探讨一下这个问题。
正文
我们都知道,SpringBoot默认的内嵌容器是Tomcat,也就是我们的程序实际上是运行在Tomcat里的。所以与其说SpringBoot可以处理多少请求,到不如说Tomcat可以处理多少请求。
关于Tomcat的默认配置,都在
spring-configuration-metadata.json文件中,对应的配置类则是
org.springframework.boot.autoconfigure.web.ServerProperties。
和处理请求数量相关的参数有四个:
- server.tomcat.threads.min-spare:最少的工作线程数,默认大小是10。该参数相当于长期工,如果并发请求的数量达不到10,就会依次使用这几个线程去处理请求。
- server.tomcat.threads.max:最多的工作线程数,默认大小是200。该参数相当于临时工,如果并发请求的数量在10到200之间,就会使用这些临时工线程进行处理。
- server.tomcat.max-connections:最大连接数,默认大小是8192。表示Tomcat可以处理的最大请求数量,超过8192的请求就会被放入到等待队列。
- server.tomcat.accept-count:等待队列的长度,默认大小是100。
举个例子说明一下这几个参数之间的关系:
如果把Tomcat比作一家饭店的话,那么一个请求其实就相当于一位客人。min-spare就是厨师(长期工);max是厨师总数(长期工+临时工);max-connections就是饭店里的座位数量;accept-count是门口小板凳的数量。来的客人优先坐到饭店里面,然后厨师开始忙活,如果长期工可以干的完,就让长期工干,如果长期工干不完,就再让临时工干。图中画的厨师一共15人,饭店里有30个座位,也就是说,如果现在来了20个客人,那么就会有5个人先在饭店里等着。如果现在来了35个人,饭店里坐不下,就会让5个人先到门口坐一下。如果来了50个人,那么饭店座位+门口小板凳一共40个,所以就会有10人离开。
也就是说,SpringBoot同时所能处理的最大请求数量是max-connections+accept-count,超过该数量的请求直接就会被丢掉。
纸上得来终觉浅,绝知此事要躬行。
上面只是理论结果,现在通过一个实际的小例子来演示一下到底是不是这样:
创建一个SpringBoot的项目,在application.yml里配置一下这几个参数,因为默认的数量太大,不好测试,所以配小一点:
server:
tomcat:
threads:
# 最少线程数
min-spare: 10
# 最多线程数
max: 15
# 最大连接数
max-connections: 30
# 最大等待数
accept-count: 10
复制代码
再来写一个简单的接口:
@GetMapping("/test")
public Response test1(HttpServletRequest request) throws Exception {
log.info("ip:{},线程:{}", request.getRemoteAddr(), Thread.currentThread().getName());
Thread.sleep(500);
return Response.buildSuccess();
}
复制代码
代码很简单,只是打印了一下线程名,然后休眠0.5秒,这样肯定会导致部分请求处理一次性处理不了而进入到等待队列。
然后我用Apifox创建了一个测试用例,去模拟100个请求:
观察一下测试结果:
从结果中可以看出,由于设置的 max-connections+accept-count 的和是40,所以有60个请求会被丢弃,这和我们的预期是相符的。由于最大线程是15,也就是有25个请求会先等待,等前15个处理完了再处理15个,最后在处理10个,也就是将40个请求分成了15,15,10这样三批进行处理。
再从控制台的打印日志可以看到,线程的最大编号是15,这也印证了前面的想法。
总结一下:如果并发请求数量低于server.tomcat.threads.max,则会被立即处理,超过的部分会先进行等待,如果数量超过max-connections与accept-count之和,则多余的部分则会被直接丢弃。
延伸:并发问题是如何产生的
到目前为止,就已经搞明白了SpringBoot可以同时处理多少请求的问题。但是在这里我还想基于上面的例子再延伸一下,就是为什么并发场景下会出现一些值和我们预期的不一样?
设想有以下场景:厨师们用一个账本记录一共做了多少道菜,每个厨师做完菜都记录一下,每次记录都是将账本上的数字先抄到草稿纸上,计算x+1等于多少,然后将计算的结果写回到账本上。
Spring容器中的Bean默认是单例的,也就是说,处理请求的Controller、Service实例就只有一份。在并发场景下,将cookSum定义为全局变量,是所有线程共享的,当一个线程读到了cookSum=20,然后计算,写回前另一个线程也读到是20,两个线程都加1后写回,最终cookSum就变成了21,但是实际上应该是22,因为加了两次。
private int cookSum = 0;
@GetMapping("/test")
public Response test1(HttpServletRequest request) throws Exception {
// 做菜。。。。。。
cookSum += 1;
log.info("做了{}道菜", cookSum);
Thread.sleep(500);
return Response.buildSuccess();
}
复制代码
如果要避免这样的情况发生,就涉及到加锁的问题了,就不在这里讨论了。
若你想更进一步提升并发性能不妨看看下面的反应式编程!
反应式编程
1.1. 阻塞可能造成浪费
现代应用程序可以覆盖大量并发用户,尽管现代硬件的功能不断提高,但现代软件的性能仍然是一个关键问题。
总的来说,有两种方法可以提高程序的性能:
- 并行化以使用更多线程和更多硬件资源。
- 在如何使用当前资源方面寻求更高的效率。
通常,Java 开发人员使用阻塞代码编写程序。这种做法很好,直到出现性能瓶颈。然后是时候引入额外的线程,运行类似的阻塞代码。但是这种资源利用率的扩展会很快引入争用和并发问题。
更糟糕的是,阻塞会浪费资源。如果您仔细观察,一旦程序涉及一些延迟(特别是 I/O,例如数据库请求或网络调用),资源就会被浪费,因为线程(可能很多线程)现在处于空闲状态,等待数据。所以并行化方法不是灵丹妙药。有必要访问硬件的全部功能,但推理也很复杂,并且容易浪费资源。
1.2. 使用异步(Asynchronicity)
前面提到的第二种方法,寻求更高的效率,可以解决资源浪费问题。通过编写异步、非阻塞代码,您可以让执行切换到另一个使用相同底层资源的活动任务,并在异步处理完成后返回到当前进程。
但是如何在 JVM 上生成异步代码呢?Java 提供了两种异步编程模型:
- Callbacks:异步方法没有返回值,但是需要一个额外的回调参数(lambda或者匿名函数),当结果可用时调用这个参数。一个著名的例子是Swing’s的EventListener层次结构。
- Futures:异步方法立即返回一个Future。异步进程计算T值,通过Future对象包装对T值的访问。该值不是立即可用的,可以轮询该对象,直到该值可用为止。例如,ExecutorService使用Future对象,运行**Callable**任务。
但是这两种技术都有他们的局限性,回调难以组合在一起,很快就会导致代码难以阅读和维护(这种情况称为"回调地狱(Callback Hell)")
Future对象要比callbacks好一些,但是Future在组合(composition)上任然比较困难。尽管Java 8通过CompletableFuture进行了改进。编排多个Future对象是可行的,但是这并不容易。并且Future还有其他问题:
- 调用get()方法很容易导致Future对象出现另一种阻塞情况。
- 不支持惰性计算。
- 缺乏对多值和高级错误处理的支持。
1.3. 从命令式编程到反应式编程
反应式-函数式编程解决的问题就是并发和并行。更通俗地说,它解决了回调地狱问题。回调地狱是以命令式的方式来处理反应式和异步用例带来的问题。反应式编程,比如RxJava实现,受到了函数式编程的影响,并且会使用声明式的方式来避免反应式-命令式代码常见的问题。
响应式库,如Reactor,Rxjava旨在解决JVM上”经典”异步方法的这些缺点,同时关注一些额外的方面:
- 可组合性(Composability )和可读性(readability)
- 数据作为一个流(flow ),使用丰富的操作符(operators)进行操作
- 在你订阅(subscribe)之前什么都不会发生,延迟发布。
- 背压能力(Backpressure )或消费者向生产者发出排放速度过高信号的能力
- 与并发无关的高级但高价值的抽象(High level but high value abstraction that is concurrency-agnostic)
2. Reactor Project 如何运行
Reactor的核心是Flux /Mono类型,它代表了数据或事件的流。它的目的是实现推送(反应式),但是也可以用于拉取(交互式)。它是延迟执行的(lazy),不是立即执行的(eager)。它可以同步使用,也可以异步使用。它能够代表随着时间推移产生的0个、1个、多个或者无穷个值或事件。
2.1. Flux 原理
当执行subscribe方法时,发布者会回调订阅者的onSubscribe方法,这个方法中,通常订阅者会借助传入的Subscription向发布者请求n个数据。然后发布者通过不断调用订阅者的onNext方法向订阅者发出最多n个数据。如果数据全部发完,则会调用onComplete告知订阅者流已经发完;如果有错误发生,则通过onError发出错误数据,同样也会终止流。
- 首先,使用类似Flux .just的方法创建发布者后,会创建一个具体的发布者(Publisher),如Flux Array。
- 当使用.subscribe订阅这个发布者时,首先会new一个具有相应逻辑的****Subscription(如ArraySubscription,这个Subscription定义了如何处理下游的request,以及如何“发出数据”);每种同类型的Subscriber都会对应一种类型的Subscription。
- 然后发布者将这个Subscription通过订阅者的.onSubscribe方法传给订阅者;
- 在订阅者的.onSubscribe方法中,需要通过Subscription发起第一次的请求.request
- Subscription收到请求,就可以通过回调订阅者的onNext方法发出元素了,有多少发多少,但不能超过请求的个数;
- 订阅者在onNext中通常定义对元素的处理逻辑,处理完成之后,可以继续发起请求;
- 发布者根据继续满足订阅者的请求;
- 直至发布者的序列结束,通过订阅者的onComplete予以告知;当然序列发送过程中如果有错误,则通过订阅者的onError予以告知并传递错误信息;这两种情况都会导致序列终止,订阅过程结束。
2.2. 操作符原理
操作符 :只对数据做搬运和加工,对下游是作为发布者(Publisher),传递上游的数据到下游;对上游是作为订阅者(Subscriber),传递下游的请求到上游。
3. 创建 Flux
3.1. 以简单方法创建 Flux
3.1.1 empty()
订阅后立即完成,不发布任何的值。
@Test
public void emptyTest() {
PrintUtil.println("Before");
Flux.empty()
.subscribe(PrintUtil::println, PrintUtil::println, () -> PrintUtil.println("complete"));
PrintUtil.println("After");
}
2021-08-31 18:04:016 [Thread-Name-main], Before
2021-08-31 18:04:016 [Thread-Name-main], complete
2021-08-31 18:04:016 [Thread-Name-main], After
复制代码
3.1.2 never()
不发布任何的通知,无论是值,还是完成或失败。这个流适用于测试。
@Test
public void errorTest() {
PrintUtil.println("Before");
Flux.error(new RuntimeException("emitter an error"))
.subscribe(PrintUtil::println, PrintUtil::println, () -> PrintUtil.println("complete"));
PrintUtil.println("After");
}
2021-08-31 18:09:023 [Thread-Name-main], Before
2021-08-31 18:09:024 [Thread-Name-main], After
复制代码
3.1.3 error()
立即给每个订阅者发送一个onError()通知。不发布任何的值,按照契约,也不会发送onCompleted()通知
@Test
public void neverTest() {
PrintUtil.println("Before");
Flux.never()
.subscribe(PrintUtil::println, PrintUtil::println, () -> PrintUtil.println("complete"));
PrintUtil.println("After");
}
2021-08-31 18:08:024 [Thread-Name-main], Before
2021-08-31 18:08:024 [Thread-Name-main], java.lang.RuntimeException: emitter an error
2021-08-31 18:08:024 [Thread-Name-main], After
复制代码
3.1.4 range
从 start 开始生成n个整型数字。例如,range(3, 3) 将会发布3、4和5,然后正常完成。每个订阅者会接收到一组相同的数字。
@Test
public void rangeTest() {
PrintUtil.println("Before");
Flux.range(3, 3)
.subscribe(PrintUtil::println);
PrintUtil.println("After");
}
2021-08-31 17:58:051 [Thread-Name-main], Before
2021-08-31 17:58:052 [Thread-Name-main], 3
2021-08-31 17:58:052 [Thread-Name-main], 4
2021-08-31 17:58:052 [Thread-Name-main], 5
2021-08-31 17:58:052 [Thread-Name-main], After
复制代码
print语句的顺序也是值得关注的。Before 和After 消息是由main客户端线程打印出来的,这一点倒不令人惊讶。但是,请注意订阅也是发生在客户端线程中的,subscribe() 实际上会阻塞客户端线程,直到所有的事件都被接收。除非某些操作符需要,否则RxJava不会隐式地在线程池中运行代码。
3.1.5 interval
interval() 会生成一个long类型数字的序列,从零开始,每个数字之间有固定的时间间隔。某种程度上,interval()类似于 ScheduledExecutorService 中的 scheduleAtFixedRate()。你可以想象一下 interval() 的多种使用场景,比如定期轮询数据、刷新用户界面或者建模时间的推移。
@Test
public void intervalTest() throws InterruptedException {
Flux.interval(Duration.ofSeconds(1))
.map(input -> {
if (input < 3) return "tick " + input;
throw new RuntimeException("boom");
})
.onErrorReturn("Uh oh")
.subscribe(log::info);
TimeUnit.SECONDS.sleep(5);
}
18:17:18.548 [parallel-1] INFO wangxw.operator.OperatorTest - tick 0
18:17:19.549 [parallel-1] INFO wangxw.operator.OperatorTest - tick 1
18:17:20.548 [parallel-1] INFO wangxw.operator.OperatorTest - tick 2
18:17:21.564 [parallel-1] INFO wangxw.operator.OperatorTest - Uh oh
复制代码
3.2. 以编程方式创建 Flux
3.2.1 Synchronous :generate on-by-one
public void testGenerate() {
Flux flux = Flux.generate(
() -> 0, // 初始state值
(state, sink) -> {
sink.next("3 x " + state + " = " + 3 * state); // 产生数据是同步的,每次产生一个数据
if (state == 10) {
sink.complete();
}
return state + 1; // 改变状态
},
(state) -> System.out.println("state: " + state)); // 最后状态值
// 订阅时触发requset->sink.next顺序产生数据
// 生产一个数据消费一个
flux.subscribe(System.out::println);
}
复制代码
3.2.2 Asynchronous and Multi-threaded: create
Flux .cretate() 操作符可以用于桥接监听器模型等现有API转为响应式流模型,支持推拉结合的模式。cretate操作符创建的BufferAsyncSink中维护了一个SpscQueue。
- 推模式:当监听触发时,调用sink.next(o),将元素放入SpscQueue,随后立即取出drain(排水)给消费者。
- **拉模式:**发生在消费者定阅时,当生产者有数据可用时将通过拉模式,拉取数据。
3.2.2.1. 示意图
- 首先,使用Flux .create的方法创建发布者后,会创建一个具体的发布者(Flux Create)。
- 当使用.subscribe订阅这个发布者时,首先会new一个具有相应逻辑的****BufferAsyncSink。
- 然后发布者将这个BufferAsyncSink通过订阅者的.onSubscribe方法传给订阅者;并回调Flux Create中的Consumer,执行 sink.onRequest(requestConsumer) 将 requestConsumer 赋值给Sink。
- 在订阅者的.onSubscribe方法中,需要通过BufferAsyncSink发起request,执行requsetConsumer,此时是拉数据
- 通过异步调用sink.next(o),将数据推送到sqscQueue中,再 poll 出来是 推数据;
3.2.2.2. 代码示例
@Test
public void testCreate() throws InterruptedException {
MyEventProcessor myEventProcesser = new MyEventProcessor<>();
Flux.create(emitter -> {
myEventProcesser.register(new MyEventListener() {
@Override
public void onDataChunk(MyEvent event) {
emitter.next(event);
}
@Override
public void processComplete() {
emitter.complete();
}
});
emitter.onRequest(n -> { // n subscribe.requset时调用
List messages = getHistory(n);
messages.forEach(PrintUtil::println);
});
}).subscribe(PrintUtil::println, PrintUtil::println); // 这时候还没有任何事件产生;
for (int i = 0; i < 20; i++) { // 6
TimeUnit.MILLISECONDS.sleep(ThreadLocalRandom.current().nextInt(1000));
myEventProcesser.newEvent(new MyEvent<>(new Date(), "Event" + i));
}
myEventProcesser.processComplete();
}
复制代码
3.2.3. Asynchronous but single-threaded: push
push is a middle ground between generate and create which is suitable for processing events from a single producer. It is similar to create in the sense that it can also be asynchronous and can manage backpressure using any of the overflow strategies supported by create. However, only one producing thread may invoke next, complete or error at a time.
根据官方的描述,push介于create和 generate 之间。异步生成序列,支持回压。但是同一时刻只能有一个线程调用next、compete、error。
3.2.3.2. 示意图
push 和 create 的唯一不同在于Flux Create.CreateMode不同。 Flux .push使用的是CreateMode.PUSH_ONLY,而Flux .create使用的是Flux
Create.CreateMode.PUSH_PULL。
public void subscribe(CoreSubscriber super t> actual) {
BaseSink sink = createSink(actual, backpressure);
actual.onSubscribe(sink);
try {
source.accept(
createMode == CreateMode.PUSH_PULL ? new SerializedFluxSink<>(sink) :
sink);
}
catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
sink.error(Operators.onOperatorError(ex, actual.currentContext()));
}
}
复制代码
BufferAsyncSink 内部维护了一个SpscLinkedArrayQueue(单生产者单消费者)队列,只支持单线程的sink源,sink先将元素发射到SpscLinkedArrayQueue中,然后触发drain,其实就是从SpscLinkedArrayQueue中poll元素并向消费者传递,中间增加SpscLinkedArrayQueue作为中转。
SerializedSink 内部除了维系一个BufferAsyncSink 作委托外,还维护一个MpscLinkedQueue(多生产者单消费者)队列,显然支持多线程的源生产。并发生产元素的时候先将元素push到MpscLinkedQueue,再从MpscLinkedQueue弹出到SpscLinkedArrayQueue中,最终由一个线程传递给消费者。
public FluxSink next(T t) {
Objects.requireNonNull(t, "t is null in sink.next(t)");
if (sink.isTerminated() || done) {
Operators.onNextDropped(t, sink.currentContext());
return this;
}
//通过WIP保证仅有一个线程将next委托给sink(BufferAsyncSink)来处理
if (WIP.get(this) == 0 && WIP.compareAndSet(this, 0, 1)) {
try {
sink.next(t);
}
catch (Throwable ex) {
Operators.onOperatorError(sink, ex, t, sink.currentContext());
}
if (WIP.decrementAndGet(this) == 0) { //
return this;
}
}
else {
//多线程并发生产元素时,并发的其他线程直接将元素发射到mpscQueue
this.mpscQueue.offer(t);
if (WIP.getAndIncrement(this) != 0) {
return this;
}
}
//这里其实又把mpsc队列中的元素取出放到spsc中
//简单点可以这么理解,多源生产模式下,元素先发射到mpsc中,单消费者取出元素从放到spsc中,中间多加了一个过渡
drainLoop();
return this;
}
复制代码
目前没有GET到两者的区别,这样做的好处到底是什么?
3.2.4. Cleaning up after push() or create()
两个回调函数,onDispose和onCancel,在取消或终止时执行任何清理。onDispose可用于在Flux 完成、出现错误或被取消时执行清理。onCancel可用于在使用onDispose进行清理之前执行任何特定于取消的操作。
Flux bridge = Flux.create(sink -> {
sink.onRequest(n -> channel.poll(n))
.onCancel(() -> channel.cancel())
.onDispose(() -> channel.close())
});
复制代码
- onCancel 首先调用,仅用于取消信号。
- onDispose 为完成、错误或取消信号调用。
4. 从回调 API 到 Flux 流
关于流,我最喜欢的一个例子就是Twitter的状态更新,也就是所谓的推文(tweet)。每秒都会有数千个用户更新,很多更新会伴随着地理位置、语言和其他元数据。为了完成这个练习,将会使用开源的Twitter4J库,它使用基于回调的API将新推文的子集推送过来。实时读取推文的最简单的可运行样例如下所示。
import lombok.extern.slf4j.Slf4j;
import org.junit.Before;
import org.junit.Test;
import reactor.core.publisher.Flux;
import twitter4j.StallWarning;
import twitter4j.Status;
import twitter4j.StatusDeletionNotice;
import twitter4j.StatusListener;
import twitter4j.TwitterStream;
import twitter4j.TwitterStreamFactory;
import java.util.concurrent.TimeUnit;
/**
* @Author: wangxw
* @Date: 2021/08/31
* @Description:
*/
@Slf4j
public class Callback2FluxTest {
public void add() throws InterruptedException {
TwitterStream twitterStream = TwitterStreamFactory.getSingleton();
twitterStream.addListener(new StatusListener() {
@Override
public void onStatus(Status status) {
log.info("Status: {} ", status);
}
@Override
public void onException(Exception e) {
log.error("Error callback ", e);
}
// 其他回调
});
twitterStream.sample();
TimeUnit.SECONDS.sleep(10);
twitterStream.shutdown();
}
复制代码
调用twitterStream.sample()将会启动一个后台线程,该线程会登录Twitter并等待新的消息。每次有推文出现,onStatus回调就会执行。执行过程可能会跨线程,所以不能依赖异常抛出的机制,而是使用onException()通知。在休眠10秒之后,通过shutdown()关闭流并清理底层的资源,比如HTTP连接或线程。
整体而言,它看上去并没有那么糟糕,这个程序的问题在于什么都不做。在现实生活中,你可能会以某种方式处理每条Status消息(推文),比如保存到数据库中或者提供给一个机器学习算法。从技术上来讲,你可以将这些逻辑放到回调中,但是这样就将基础设施调用和业务逻辑耦合在一起了。简单地将功能委托给一个单独的类会更好一些,但很遗憾的是无法重用。我们真正想要的是技术领域(消费HTTP连接中的数据)和业务领域(解释输入的数据)的清晰分离。所以,我们构建了第二层回调。
void consume(Consumer onStatus, Consumer onException) throws InterruptedException {
TwitterStream twitterStream = TwitterStreamFactory.getSingleton();
twitterStream.addListener(new StatusListener() {
@Override
public void onStatus(Status status) {
onStatus.accept(status);
}
@Override
public void onException(Exception e) {
onException.accept(e);
}
// 其他回调
});
twitterStream.sample();
TimeUnit.SECONDS.sleep(10);
twitterStream.shutdown();
}
复制代码
通过添加这个额外的抽象层,现在能够以各种方式重用consume()方法。假设不再是进行日志记录,而是要进行持久化、分析或欺诈检测。
但是这只将问题在层级结构中进行了提升。如果想要记录每秒推文的数量该怎么办?或者只想消费前5条数据,又该怎样实现?如果想要有多个监听器,又会发生什么情况?前述每种情况都会打开一个新的HTTP连接。最后不得不提的是,API不允许完成后再取消订阅,以免带来资源泄漏的风险。我们正在努力朝着基于Rx的API的方向努力。此时,不再传递回调到可能要执行的地方,而是返回一个Flux 并允许每人按需对其进行订阅。但是,需要记住的一点是,如下的实现还是会为每个Subscriber打开一个新的网络连接。
public void flux() {
Flux flux = Flux.create(emmiter -> {
TwitterStream twitterStream = TwitterStreamFactory.getSingleton();
twitterStream.addListener(new StatusListener() {
@Override
public void onStatus(Status status) {
emmiter.next(status);
}
@Override
public void onException(Exception e) {
emmiter.error(e);
}
// 其他回调
});
emmiter.onDispose(() -> twitterStream.shutdown()); // 关闭twitterStream
}).doOnSubscribe(s->log.info("doOnSubscribe"));
flux.subscribe(status -> log.info("Status: {} ", status),
ex -> log.error("Error callback ", ex));
}
复制代码
以上代码与consume(...)的巨大差别在于,不必将回调作为参数传递给observe()。相反,样例可以返回Flux ,将它到处传递,然后在某个地方进行存储,并且只要需要就可以随时随地使用。还可以将这个Flux 与其他的Flux 进行组合。还未讨论的一个方面是资源清理,有人取消订阅时,应当关闭TwitterStream,以避免资源泄漏。
public class LazyTwitterFlux {
private final Set<FluxSink super status>> fluxSinks = new CopyOnWriteArraySet<>();
private final Flux flux = Flux.create(emmiter -> {
registrer(emmiter);
emmiter.onDispose(() -> unregistrer(emmiter));
});
private final TwitterStream twitterStream;
public LazyTwitterFlux() {
this.twitterStream = TwitterStreamFactory.getSingleton();
twitterStream.addListener(new StatusListener() {
@Override
public void onStatus(Status status) {
fluxSinks.forEach(sink -> sink.next(status));
}
@Override
public void onException(Exception e) {
fluxSinks.forEach(fluxSink -> fluxSink.error(e));
}
// 其他回调
});
}
public Flux flux() {
return flux;
}
private synchronized void registrer(FluxSink super status> fluxSink) {
fluxSinks.add(fluxSink);
if (fluxSinks.isEmpty()) {
twitterStream.sample();
}
}
private synchronized void unregistrer(FluxSink super status> fluxSink) {
fluxSinks.remove(fluxSink);
if (fluxSinks.isEmpty()) {
twitterStream.shutdown();
}
}
}
@Test
public void lazy() {
LazyTwitterFlux lazyTwitterFlux = new LazyTwitterFlux();
Flux flux1 = lazyTwitterFlux.flux()
.doOnSubscribe(s -> log.info("doOnSubscribe"));
Flux flux2 = lazyTwitterFlux.flux()
.doOnSubscribe(s -> log.info("doOnSubscribe"));
flux1.subscribe(status -> log.info("Status: {} ", status),
ex -> log.error("Error callback ", ex));
flux2.subscribe(status -> log.info("Status: {} ", status),
ex -> log.error("Error callback ", ex));
}
复制代码
fluxSinks 的集线程安全地存储当前已订阅的Subscriber集合。每次新的Subscriber出现,将其添加到一个集中,并连接到底层的事件源上。相反,最后的Subscriber消失时,就关闭上游的源。这里的关键在于始终只有一个到上游系统的连接,而不是为每个订阅者都建立连接。这个实现能够正常运行而且比较健壮,但看起来过于低层级并且易于出错。对subscribers的访问必须使用synchronized同步,而且集合本身必须支持安全地迭代。对register()的调用必须发生在通过 reregister() 注销回调之前,否则,后者可能会在注册之前调用。将一个上游源多路复用到多个Flux 的通用场景,肯定有更好的方式来实现。幸而,至少有两种这样的机制。Rx致力于减少这样危险的样板代码并抽象出并发性。6.4 节将使用 ConnectableFlux 的refCount() 实现单次订阅。
5. Hot 和 Cold 类型的 Flux
在得到一个 Flux 实例之后,很重要的一点是要理解它是 hot 类型的还是 cold 类型的。它们的API和语义是相同的,但是使用Flux 的方法取决于它的类型。
5.1. Cold 类型
cold类型的Flux 完全是延迟(lazy)执行的,并且在有人对其感兴趣时才会开始发布事件。如果没有观察者,那么Flux 只是一个静态的数据结构。这也意味着,每个订阅者都会接收到属于自己的流的副本,因为事件是延迟生成的,并且一般不会采取任何形式的缓存。cold类型的Flux 通常来源于Flux .create(),按照语义,它不会启用任何逻辑,而是推迟到有人实际对其监听才会执行。从某种程度上来说,cold 类型的 Flux 依赖Subscriber。cold类型的 Flux 的样例除了create()之外,还包括Flux .just()、from()和range()。订阅一个cold类型的Flux 通常还涉及create()中的副作用,比如查询数据库或打开连接。
5.2. Hot 类型
hot类型的Flux 则与之不同。在得到这种类型的Flux 的时候,不管是否有Subscriber,它都可能已经开始发布事件了。即便没有人监听,事件可能会丢失,Flux 依然会往下游推送事件。通常情况下,可以完全控制cold类型的Flux ,但是 hot类型的Flux 是独立于消费者的。Subscriber出现时,hot类型的Flux 的行为类似于电话窃听(wire tap),透明地发布流经它的事件。Subscriber的出现和消失并不会改变Flux 的行为,它是完全解耦和独立的。
hot类型的Flux 通常发生在完全无法控制事件源的场景下。这种Flux 的样例包括鼠标移动、键盘输入或按钮点击。
依赖事件传递时,hot类型和cold类型Flux 的差异就变得非常重要了。不管立即订阅还是几个小时之后订阅cold类型的Flux ,你都会获得完整且一致的事件集。但如果Flux 是hot类型的,那么你就无法确保能接收到所有事件。稍后将介绍一些技术,它们能够确保每个订阅者都能接收到所有事件,例如cache()操作符在技术上来讲,可以缓冲来自hot类型Flux 的所有事件,让后续的订阅者都能接收到相同的事件序列。但是,在理论上,它消耗的内存量是没有限制的,所以在缓存hot类型的Flux 时要非常小心。
6. ConnectableFlux
有时,我们可能不希望仅将某些处理延迟到某一个订阅者的订阅时间,而是希望其中几个订阅者会合,然后再触发订阅和数据生成(这有点类似于CountDownLantch)。
ConnectableFlux 就是为此而生,Flux API 中有两种常用的返回ConnectableFlux 的方式:publish 和replay。
- publish会尝试满足各个不同订阅者的需求(也就是回压),并综合这些请求反馈给源。假设有某个订阅者的需求为 0,发布者会暂停向所有订阅者发出元素。
- replay将对第一个订阅后产生的数据进行缓存,最多缓存数量取决于配置(时间/缓存大小)。 它会对后续接入的订阅者重新发送数据。
ConnectableFlux 提供了多种对订阅的管理方式。包括:
- connect() 当有足够的订阅接入后,可以对 Flux 手动执行一次。它会触发对上游源的订阅。
- autoConnect(n) 与 connect() 类似,不过是在有 n 个订阅的时候自动触发。
- refCount(n) 不仅能够在订阅者接入的时候自动触发,还会检测订阅者的取消动作。如果订阅者全部取消订阅,则会将源“断开连接”,再有新的订阅者接入的时候才会继续“连上”发布者。
- refCount(int, Duration) 增加了一个倒计时:一旦订阅者数量太低了,它会等待 Duration参数指定的时间,如果没有新的订阅者接入才会与源断开连接。
6.1. connect 例子
@Test
public void connectTest() throws InterruptedException {
Flux source = Flux.range(1, 3)
.map(Object::toString)
.doOnSubscribe(s -> log.info("subscribed to source"));
ConnectableFlux co = source.publish();
co.subscribe(log::info);
co.subscribe(log::info);
log.info("done subscribing");
TimeUnit.SECONDS.sleep(1);
log.info("will now connect");
co.connect();
}
16:06:03.494 [main] INFO wangxw.flux.FluxTest - done subscribing
16:06:04.496 [main] INFO wangxw.flux.FluxTest - will now connect
16:06:04.498 [main] INFO wangxw.flux.FluxTest - subscribed to source
16:06:04.500 [main] INFO wangxw.flux.FluxTest - 1
16:06:04.500 [main] INFO wangxw.flux.FluxTest - 1
16:06:04.500 [main] INFO wangxw.flux.FluxTest - 2
16:06:04.500 [main] INFO wangxw.flux.FluxTest - 2
16:06:04.500 [main] INFO wangxw.flux.FluxTest - 3
16:06:04.500 [main] INFO wangxw.flux.FluxTest - 3
复制代码
只有当connect() 之后上游才会发出数据。
6.2. autoConnect(n) 例子
@Test
public void autoConnect() throws InterruptedException {
Flux source = Flux.range(1, 3)
.map(Object::toString)
.doOnSubscribe(s -> log.info("subscribed to source"));
Flux co = source.publish().autoConnect(2);
log.info("subscribed first");
co.subscribe(log::info);
TimeUnit.SECONDS.sleep(1);
log.info("subscribing second");
co.subscribe(log::info);
}
17:18:09.468 [main] INFO wangxw.flux.FluxTest - subscribed first
17:18:10.475 [main] INFO wangxw.flux.FluxTest - subscribing second
17:18:10.486 [main] INFO wangxw.flux.FluxTest - subscribed to source
17:18:10.492 [main] INFO wangxw.flux.FluxTest - 1
17:18:10.492 [main] INFO wangxw.flux.FluxTest - 1
17:18:10.492 [main] INFO wangxw.flux.FluxTest - 2
17:18:10.492 [main] INFO wangxw.flux.FluxTest - 2
17:18:10.493 [main] INFO wangxw.flux.FluxTest - 3
17:18:10.493 [main] INFO wangxw.flux.FluxTest - 3
复制代码
当两个订阅者都完成订阅之后,上游才收到订阅请求,并开始发出数据。
6.3. refCount() 例子
@Test
public void refCountTest() throws InterruptedException {
Flux source = Flux.interval(Duration.ofMillis(500))
.map(Object::toString)
.doOnSubscribe(s -> log.info("doOnSubscribe"))
.doOnCancel(() -> log.info("doOnCancel"));
Flux flux = source.publish().refCount(2, Duration.ofSeconds(2));
log.info("subscribed first");
Disposable s1 = flux.subscribe(x -> log.info("s1:" + x));
TimeUnit.SECONDS.sleep(1);
log.info("subscribed second");
Disposable s2 = flux.subscribe(x -> log.info("s2:" + x));
TimeUnit.SECONDS.sleep(1);
log.info("subscribed first disposable");
s1.dispose();
TimeUnit.SECONDS.sleep(1);
log.info("subscribed second disposable"); // 所有的订阅者都取消了
s2.dispose();
TimeUnit.SECONDS.sleep(1); // 在2s内 s3进行了订阅
log.info("subscribed third");
Disposable s3 = flux.subscribe(x -> log.info("s3:" + x));
TimeUnit.SECONDS.sleep(1);
log.info("subscribed third disposable");
s3.dispose(); // 所有订阅者都取消了 disconnect
TimeUnit.SECONDS.sleep(3);
log.info("subscribed fourth"); // 3s 后(超过了2s)s4、s5订阅,触发connect
Disposable sub4 = flux.subscribe(l -> log.info("s4: " + l));
TimeUnit.SECONDS.sleep(1);
log.info("subscribed fifth");
Disposable sub5 = flux.subscribe(l -> log.info("s5: " + l));
TimeUnit.SECONDS.sleep(2);
}
17:29:23.044 [main] INFO wangxw.flux.ConnectableFluxTest - subscribed first
17:29:24.052 [main] INFO wangxw.flux.ConnectableFluxTest - subscribed second
17:29:24.067 [main] INFO wangxw.flux.ConnectableFluxTest - doOnSubscribe
17:29:24.576 [parallel-1] INFO wangxw.flux.ConnectableFluxTest - s1:0
17:29:24.576 [parallel-1] INFO wangxw.flux.ConnectableFluxTest - s2:0
17:29:25.076 [parallel-1] INFO wangxw.flux.ConnectableFluxTest - s1:1
17:29:25.076 [parallel-1] INFO wangxw.flux.ConnectableFluxTest - s2:1
17:29:25.076 [main] INFO wangxw.flux.ConnectableFluxTest - subscribed first disposable
17:29:25.576 [parallel-1] INFO wangxw.flux.ConnectableFluxTest - s2:2
17:29:26.094 [parallel-1] INFO wangxw.flux.ConnectableFluxTest - s2:3
17:29:26.094 [main] INFO wangxw.flux.ConnectableFluxTest - subscribed second disposable
17:29:27.101 [main] INFO wangxw.flux.ConnectableFluxTest - subscribed third
17:29:27.101 [main] INFO wangxw.flux.ConnectableFluxTest - s3:4
17:29:27.101 [main] INFO wangxw.flux.ConnectableFluxTest - s3:5
17:29:27.576 [parallel-1] INFO wangxw.flux.ConnectableFluxTest - s3:6
17:29:28.075 [parallel-1] INFO wangxw.flux.ConnectableFluxTest - s3:7
17:29:28.102 [main] INFO wangxw.flux.ConnectableFluxTest - subscribed third disposable
17:29:30.103 [parallel-3] INFO wangxw.flux.ConnectableFluxTest - doOnCancel // 注意时间 2s 后执行了 doOnCancel
17:29:31.103 [main] INFO wangxw.flux.ConnectableFluxTest - subscribed fourth
17:29:32.104 [main] INFO wangxw.flux.ConnectableFluxTest - subscribed fifth
17:29:32.104 [main] INFO wangxw.flux.ConnectableFluxTest - doOnSubscribe
17:29:32.606 [parallel-4] INFO wangxw.flux.ConnectableFluxTest - s4: 0
17:29:32.606 [parallel-4] INFO wangxw.flux.ConnectableFluxTest - s5: 0
17:29:33.107 [parallel-4] INFO wangxw.flux.ConnectableFluxTest - s4: 1
17:29:33.107 [parallel-4] INFO wangxw.flux.ConnectableFluxTest - s5: 1
17:29:33.605 [parallel-4] INFO wangxw.flux.ConnectableFluxTest - s4: 2
17:29:33.605 [parallel-4] INFO wangxw.flux.ConnectableFluxTest - s5: 2
17:29:34.105 [parallel-4] INFO wangxw.flux.ConnectableFluxTest - s4: 3
17:29:34.105 [parallel-4] INFO wangxw.flux.ConnectableFluxTest - s5: 3
复制代码
本例中,refCount() 设置为最少两个订阅者接入是才开始发出数据,当所有订阅者都取消时,如果不能在两秒内接入新的订阅者,则上游会断开连接。
上边的例子中,随着前两个订阅者相继取消订阅,第三个订阅者及时(在2秒内)开始订阅,所以上游会继续发出数据,而且根据输出可以看出是“hot flux”。
当第三个订阅者取消后,第四个订阅者没能及时开始订阅,所以上游发布者断开连接。当第五个订阅者订阅之后,第四和第五个订阅者相当于开始了新一轮的订阅。
6.4. 使用refCount()实现单次订阅
ConnectableFlux 以一种有意思的方式来协调多个Subscriber,并共享一个底层的订阅。还记得最初借助LazyTwitterFlux 创建单个延迟执行的对底层资源的连接吗?必须要手动跟踪所有的Subscriber,如果第一个订阅者出现或最后一个订阅者离开,就建立连接或断开连接。ConnectableFlux 是Flux 的一个特殊类型,能够确保底层始终最多只有一个Subscriber,但实际上它又允许多个Subscriber共享相同的底层资源。
Subject是创建Flux的必要方式,而ConnectableFlux会保护原始的上游Flux,并确保最多只能有一个Subscriber可以接触到它。不管有多少Subscriber连接到ConnectableFlux,系统只会打开一个Flux的订阅,这个订阅是基于该Flux创建的。
@Test
public void refCounted() {
Flux flux = Flux.create(emmiter -> {
log.info("Establishing connection");
twitterStream.addListener(new StatusListener() {
@Override
public void onStatus(Status status) {
emmiter.next(status);
}
@Override
public void onException(Exception e) {
emmiter.error(e);
}
// 其他回调
});
emmiter.onDispose(() -> {
log.info("Disconnecting");
twitterStream.shutdown();
});
}).doOnSubscribe(s -> log.info("doOnSubscribe"))
.doOnComplete(() -> log.info("doOnComplete"));
Flux refCounted = flux.publish().refCount();
Disposable s1 = refCounted.subscribe(status -> log.info("Status: {} ", status),
ex -> log.error("Error callback ", ex));
Disposable s2 = refCounted.subscribe(status -> log.info("Status: {} ", status),
ex -> log.error("Error callback ", ex));
s1.dispose();
s2.dispose();
}
17:51:21.610 [main] INFO wangxw.flux.Callback2FluxTest - doOnSubscribe
17:51:21.614 [main] INFO wangxw.flux.Callback2FluxTest - Establishing connection
17:51:21.616 [main] INFO wangxw.flux.Callback2FluxTest - Disconnecting
复制代码
直到真正有第一个Subscriber的时候,连接才会建立。但是,更重要的在于,第二个Subscriber不会初始化新的连接,它甚至不会接触到原始的Flux。publish(). refCount()会将底层的Flux 串联包装起来,并拦截所有的订阅。
操作符
defer
延迟创建
@Test
public void deferTest() throws InterruptedException {
Flux flux1 = Flux.just(PrintUtil.println(new Date()));
Flux flux2 = Flux.defer(() -> Flux.just(PrintUtil.println(new Date())));
flux1.subscribe(x -> log.info("s1: " + x));
flux2.subscribe(x -> log.info("s2: " + x));
TimeUnit.SECONDS.sleep(3);
flux1.subscribe(x -> log.info("s3: " + x));
flux2.subscribe(x -> log.info("s4: " + x));
}
15:25:24.629 [main] INFO wangxw.operator.OperatorTest - s1: 2021-09-06 15:25:024
15:25:24.630 [main] INFO wangxw.operator.OperatorTest - s2: 2021-09-06 15:25:024
15:25:27.633 [main] INFO wangxw.operator.OperatorTest - s3: 2021-09-06 15:25:024
15:25:27.634 [main] INFO wangxw.operator.OperatorTest - s4: 2021-09-06 15:25:027
复制代码
delayElements
@Test
public void delayTest() throws InterruptedException {
Flux.just("1", "2").delayElements(Duration.ofSeconds(2))
.subscribe(log::info);
TimeUnit.SECONDS.sleep(5);
}
16:02:40.482 [parallel-1] INFO wangxw.operator.OperatorTest - 1
16:02:42.483 [parallel-2] INFO wangxw.operator.OperatorTest - 2
复制代码
延迟发布,在运行这个程序的时候,即便进行了订阅,应用程序也会立即终止而不展现任何的结果,这是因为事件的发布是在后台异步运行的,所以需要在最后添加任意一个sleep()。
map
@Test
public void mapTest() throws InterruptedException {
Flux.just(1, 2, 3, 4)
.map(i -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
return i * 2 + "";
})
.log()
.subscribe(log::info);
}
16:07:24.005 [main] INFO reactor.Flux.MapFuseable.1 - | onSubscribe([Fuseable] FluxMapFuseable.MapFuseableSubscriber)
16:07:24.007 [main] INFO reactor.Flux.MapFuseable.1 - | request(unbounded)
16:07:25.007 [main] INFO reactor.Flux.MapFuseable.1 - | onNext(2)
16:07:25.008 [main] INFO wangxw.operator.OperatorTest - 2
16:07:26.009 [main] INFO reactor.Flux.MapFuseable.1 - | onNext(4)
16:07:26.009 [main] INFO wangxw.operator.OperatorTest - 4
16:07:27.009 [main] INFO reactor.Flux.MapFuseable.1 - | onNext(6)
16:07:27.009 [main] INFO wangxw.operator.OperatorTest - 6
16:07:28.009 [main] INFO reactor.Flux.MapFuseable.1 - | onNext(8)
16:07:28.009 [main] INFO wangxw.operator.OperatorTest - 8
16:07:28.012 [main] INFO reactor.Flux.MapFuseable.1 - | onComplete()
复制代码
map 是同步执行的,会阻塞客户端线程。
flatMap
flatMap()是Rx中最重要的操作符之一。乍看上去,它类似于map(),但是它对每个元素的转换都会返回另外一个(内嵌的)Flux 。鉴于Flux 可以代表另外一个异步操作,我们很快就意识到flatMap()可以为上游的每个事件执行异步计算(fork执行)并将结果加入进来。
flatMap 本身并不是异步的,但是内嵌的Flux可以执行异步操作。
@Test
public void flatMapTest() throws InterruptedException {
Function<Integer, Publisher> mapper = i -> Flux.just(i * 2 + "").delayElements(Duration.ofSeconds(1));
Flux.just(1, 2, 3, 4)
.flatMap(mapper)
.subscribe(log::info);
TimeUnit.SECONDS.sleep(10);
}
17:23:42.566 [parallel-2] INFO wangxw.operator.OperatorTest - 4
17:23:42.566 [parallel-1] INFO wangxw.operator.OperatorTest - 2
17:23:42.568 [parallel-3] INFO wangxw.operator.OperatorTest - 6
17:23:42.572 [parallel-4] INFO wangxw.operator.OperatorTest - 8
复制代码
从本质上来讲,flatMap()接收一个随时间(事件)出现的值的主(master)序列(Flux ),然后将每个事件分别替换为独立的子序列。这些子序列彼此之间是不相关的,并且与生成它们的主序列中的事件也是不相关的。更确切地说,此时拥有的不再是单个主序列,而是一组Flux ,其中每个都是独立运行的,并且随着时间的推移出现和消失。因此,flatMap()并不能对子事件抵达下游操作符/订阅者的顺序给出任何的保证。
@Test
public void flatMap3Test() throws InterruptedException {
Flux.just(DayOfWeek.SUNDAY, DayOfWeek.MONDAY)
.flatMap(this::loadRecordFor)
.subscribe(log::info);
TimeUnit.SECONDS.sleep(5);
}
private Flux loadRecordFor(DayOfWeek dow) {
switch (dow) {
case SUNDAY:
return Flux.interval(Duration.ofMillis(90))
.take(5)
.map(i -> "Sun" + i);
case MONDAY:
return Flux.interval(Duration.ofMillis(65))
.take(5)
.map(i -> "Mon" + i);
default:
return Flux.empty();
}
}
17:31:38.893 [parallel-2] INFO wangxw.operator.OperatorTest - Mon0
17:31:38.918 [parallel-1] INFO wangxw.operator.OperatorTest - Sun0
17:31:38.957 [parallel-2] INFO wangxw.operator.OperatorTest - Mon1
17:31:39.007 [parallel-1] INFO wangxw.operator.OperatorTest - Sun1
17:31:39.023 [parallel-2] INFO wangxw.operator.OperatorTest - Mon2
17:31:39.087 [parallel-2] INFO wangxw.operator.OperatorTest - Mon3
17:31:39.096 [parallel-1] INFO wangxw.operator.OperatorTest - Sun2
17:31:39.152 [parallel-2] INFO wangxw.operator.OperatorTest - Mon4
17:31:39.188 [parallel-1] INFO wangxw.operator.OperatorTest - Sun3
17:31:39.277 [parallel-1] INFO wangxw.operator.OperatorTest - Sun4
复制代码
控制flatMap 的并发性
假设你有大量用户的一个列表,它们被包装在Flux 中。每个User有一个loadProfile()方法,该方法会通过HTTP请求返回一个Flux 实例。我们的目标是尽快获取所有用户概况(profile), flatMap()就是为了实现该目标而设计的,可以对上游的值进行并发计算,如下所示:
@Test
public void flatMap4Test() {
List users = new ArrayList<>();
Flux.fromIterable(users)
.flatMap(User::loadProfile);
}
static class User {
public Flux loadProfile() {
// 发送HTTP请求 异步执行
return Flux.empty();
}
}
static class Profile {
}
复制代码
乍看上去这种方式非常不错。Flux是从一个使用from()操作符的固定List生成的。因此,订阅它的时候,会将所有的用户立即释放出来。对于每个新User,flatMap()都会调用loadProfile()并返回Flux 。然后,flatMap()透明地订阅每个新的Flux,将所有的Profile事件转发至下游。订阅内部Flux 就相当于发起新的HTTP连接。因此,假设我们有10000个用户,那就会突然发起10000个并发的HTTP请求。如果所有的这些请求都访问相同的服务器,预计得到的情况无外乎如下几种:
- 拒绝连接。
- 长时间等待和超时。
- 服务器停机。
- 遇到限速或者被加入黑名单。
- 整体的延迟增加。
- 客户端的问题,包括太多打开(open)状态的Socket、线程,以及过多的内存消耗。
增加并发会在一定的程度上得到回报,但如果你尝试运行太多并发操作,最终将会导致大量的上下文切换、过高的内存和CPU占用,以及整体性能的下降。
Flux.fromIterable(users)
.flatMap(User::loadProfile, 10);
复制代码
flatMap()有一个非常简单的重载形式,能够限制内部流的并发订阅总数。参数concurrency 限制了内部Flux 的订阅数量。在实践中,flatMap()接收前10个User时,它会为每个User调用loadProfile(),但是来自上游的第11个User出现时,flatMap()不会再调用loadProfile()。相反,它会等正在运行的内部流完成。因此,concurrency 参数限制了flatMap()生成的后台任务的数量。
concatMap(f)在语义上是与flatMap(f, 1)(也就是concurrency 值为1的flatMap())等价的。
利用flatMap计算笛卡尔积
根据两个流中的所有值生成笛卡儿积。例如,可能有两个Flux ,一个代表棋盘的行(rank,从1到8),另一个代表棋盘的列(file,从a到h)。应该能够找到棋盘上所有64个可能的方格。Flux 将会精确地发布64个事件:针对a它会生成a1、a2...a8,然后是b1、b2等,直到最后达到h7和h8。这是flatMap()另一个非常有意思的样例,每列(file)都会生成该列对应的所有可能的方格。
@Test
public void cartesianTest() {
Flux oneToEight = Flux.range(1, 8);
Flux ranks = oneToEight.map(Objects::toString);
Flux files = oneToEight.map(x -> 'a' + x - 1)
.map(ascii -> (char) ascii.intValue())
.map(ch -> Character.toString(ch));
Flux squares = files
.flatMap(file -> ranks.map(rank -> file + rank));
squares.subscribe(log::info);
}
复制代码
concatMap
concatMap() 可以保持下游事件的顺序,使其与上游事件的顺序完全契合。
@Test
public void flatMap3Test() throws InterruptedException {
Flux.just(DayOfWeek.SUNDAY, DayOfWeek.MONDAY)
.concatMap(this::loadRecordFor)
.subscribe(log::info);
TimeUnit.SECONDS.sleep(5);
}
private Flux loadRecordFor(DayOfWeek dow) {
switch (dow) {
case SUNDAY:
return Flux.interval(Duration.ofMillis(90))
.take(5)
.map(i -> "Sun" + i);
case MONDAY:
return Flux.interval(Duration.ofMillis(65))
.take(5)
.map(i -> "Mon" + i);
default:
return Flux.empty();
}
}
17:27:15.161 [parallel-1] INFO wangxw.operator.OperatorTest - Sun0
17:27:15.250 [parallel-1] INFO wangxw.operator.OperatorTest - Sun1
17:27:15.340 [parallel-1] INFO wangxw.operator.OperatorTest - Sun2
17:27:15.432 [parallel-1] INFO wangxw.operator.OperatorTest - Sun3
17:27:15.520 [parallel-1] INFO wangxw.operator.OperatorTest - Sun4
17:27:15.587 [parallel-2] INFO wangxw.operator.OperatorTest - Mon0
17:27:15.652 [parallel-2] INFO wangxw.operator.OperatorTest - Mon1
17:27:15.716 [parallel-2] INFO wangxw.operator.OperatorTest - Mon2
17:27:15.783 [parallel-2] INFO wangxw.operator.OperatorTest - Mon3
17:27:15.848 [parallel-2] INFO wangxw.operator.OperatorTest - Mon4
复制代码
第一个事件(Sunday)从上游出现的时候,concatMap()会订阅loadRecordsFor()产生的Flux ,并将产生的所有事件传递到下游。这个内部流完成时,concatMap()会等待下一个上游事件(Monday)并重复以上过程。concatMap()不会涉及任何的并发性,但是它保证了上游事件的顺序,避免出现重叠。
flatMap()内部使用了merge()操作符,同时订阅所有的子Flux ,对它们不做任何的区分。这也是下游事件互相交叉的原因。但是,concatMap()可以在技术上使用concat()操作符。concat()只会先订阅第一个底层的Flux ,只有第一个完成之后,才会订阅第二个。
merge
按照顺序合并流
@Test
public void mergeTest() throws InterruptedException {
Flux flux1 = Flux.interval(Duration.ofMillis(300)).map(x -> "p1: " + x);
Flux flux2 = Flux.interval(Duration.ofMillis(500)).map(x -> "p2: " + x);
Flux mergeFlux = Flux.merge(flux1, flux2);
mergeFlux.subscribe(log::info);
TimeUnit.SECONDS.sleep(2);
}
19:03:12.339 [parallel-1] INFO wangxw.operator.OperatorTest - p1: 0
19:03:12.541 [parallel-2] INFO wangxw.operator.OperatorTest - p2: 0
19:03:12.641 [parallel-1] INFO wangxw.operator.OperatorTest - p1: 1
19:03:12.939 [parallel-1] INFO wangxw.operator.OperatorTest - p1: 2
19:03:13.040 [parallel-2] INFO wangxw.operator.OperatorTest - p2: 1
19:03:13.240 [parallel-1] INFO wangxw.operator.OperatorTest - p1: 3
19:03:13.540 [parallel-1] INFO wangxw.operator.OperatorTest - p1: 4
19:03:13.542 [parallel-2] INFO wangxw.operator.OperatorTest - p2: 2
19:03:13.841 [parallel-1] INFO wangxw.operator.OperatorTest - p1: 5
19:03:14.040 [parallel-2] INFO wangxw.operator.OperatorTest - p2: 3
复制代码
需要注意,任何底层Flux 出现的错误都会立即传递到Observer。可以使用merge()的mergeDelayError()变种形式来推迟错误,这样直到其他的流都完成,错误通知才会发布。mergeDelayError()甚至能够保证收集所有的异常,而不仅仅是第一个,并将它们封装到
rx.exceptions.CompositeException。
zip
压缩(zipping)指的是将两个(或更多)流组合起来的操作,在这个过程中,某个流中的每个事件必须要与其他流对应的事件进行成对组合。下游事件是通过组合每个流中的第一个事件,然后再组合第二个事件,以此类推生成的。
concat
按顺序订阅,等待第一个流完成,再依次订阅下一个。
@Test
public void concatTest () throws InterruptedException {
Flux flux1 = Flux.just("1","2","3").delayElements(Duration.ofSeconds(1));
Flux flux2 = Flux.just("4","5","6");
Flux flux = Flux.concat(flux1, flux2);
flux.subscribe(log::info);
TimeUnit.SECONDS.sleep(3);
}
16:43:21.315 [parallel-1] INFO wangxw.operator.OperatorTest - 1
16:43:22.317 [parallel-2] INFO wangxw.operator.OperatorTest - 2
16:43:23.318 [parallel-3] INFO wangxw.operator.OperatorTest - 3
16:43:23.318 [parallel-3] INFO wangxw.operator.OperatorTest - 4
16:43:23.318 [parallel-3] INFO wangxw.operator.OperatorTest - 5
16:43:23.319 [parallel-3] INFO wangxw.operator.OperatorTest - 6
复制代码
错误处理操作符
在 try-catch 块中处理异常最常用的几种方法:
- 捕获并返回一个静态默认值。
- 捕获并动态计算回退值。
- 捕获并执行一个回退方法。
- 捕获,包装为一个BusinessException,然后重新抛出。
- 捕获,记录一个特定的错误日志,然后重新抛出。
- 使用 finally 块清理资源或 Java 7 “try-with-resource” 构造。
静态回退值(Static Fallback Value)
在 Reactor 中与“捕获并返回静态默认值”的等价的是onErrorReturn. 以下示例显示了如何使用它:
try {
return doSomethingDangerous(10);
}
catch (Throwable error) {
return "RECOVERED";
}
public String doSomethingDangerous(int i) {
if (i == 10) {
throw new BusinessException();
}
return i + "";
}
复制代码
在 Reactor 中等价的是:
Flux flux = Flux.just(10)
.map(this::doSomethingDangerous)
.onErrorReturn("RECOVERED");
flux.subscribe(log::info, e -> log.error("error", e));
00:07:57.556 [main] INFO wangxw.operator.ErrorHandleOperatorTest - RECOVERED
复制代码
还可以执行选择执对异常执行一个Predicate来决定是否恢复,如下面的示例:
Flux.just(10)
.map(this::doSomethingDangerous)
.onErrorReturn(e -> e.getMessage().equals("boom10"), "recovered10");
复制代码
仅当异常消息为“boom10”时,执行回退。
回退方法(Fallback Method)
在Reactor中与“捕获并执行一个回退方法”等价的是 onErrorResume ,例如从外部但不可靠的服务中获取数据,当外部服务异常时则从缓存中获取数据作为回退值,示例如下:
String v1;
try {
v1 = callExternalService("key1");
}
catch (Throwable error) {
v1 = getFromCache("key1");
}
String v2;
try {
v2 = callExternalService("key2");
}
catch (Throwable error) {
v2 = getFromCache("key2");
}
复制代码
在 Reactor 中等价的是:
Flux.just("key1", "key2")
.flatMap(k -> callExternalService(k) // 每个键都调用外部服务
.onErrorResume(e -> getFromCache(k)) // 外部服务执行失败执行回退
);
复制代码
和 onErrorReturn 一样,onErrorResume 也有一些变体,可以让你根据异常的类型或Predicate来筛选返回的异常。它接受一个Function的事实并且允许根据遇到的错误选择切换到不同的回退方法。下面的例子展示了如何做到这一点:
Flux.just("timeout1", "unknown", "key2")
.flatMap(k -> callExternalService(k)
.onErrorResume(error -> { // 发生错误是,动态选择如何继续
if (error instanceof TimeoutException)
return getFromCache(k); // 如果超时,则从访问本地缓存
else if (error instanceof UnknownKeyException)
return registerNewEntry(k, "DEFAULT"); // 如果 key 不存在则创建一个新的条目
else
return Flux.error(error); // 在其他情况下重新抛
})
);
复制代码
动态回退值(Dynamic Fallback Value)
命令式示例如下:
try {
Value v = erroringMethod();
return MyWrapper.fromValue(v);
}
catch (Throwable error) {
return MyWrapper.fromError(error);
}
复制代码
Reactor 代码如下:
erroringFlux.onErrorResume(error -> Mono.just(
MyWrapper.fromError(error)
));
复制代码
捕获并重新抛出(Catch and Rethrow)
命令式示例如下:
try {
return callExternalService(k);
}
catch (Throwable error) {
throw new BusinessException("oops, SLA exceeded", error);
}
复制代码
在“fallback method”示例中,flatMap中的最后一行提示我们如何以反应方式实现相同的目标,如下所示:
Flux.just("timeout1")
.flatMap(k -> callExternalService(k))
.onErrorResume(original -> Flux.error(
new BusinessException("oops, SLA exceeded", original))
);
复制代码
但是,有一种更直接的方法,即可以通过onErrorMap实现相同的效果:
Flux.just("timeout1")
.flatMap(k -> callExternalService(k))
.onErrorMap(original -> new BusinessException("oops, SLA exceeded", original));
复制代码
记录日志(Log or React on the Side)
对于希望错误继续传播但在不修改序列的情况下对其作出反应(例如,记录错误)的情况,可以使用doError操作符。这相当于“捕获,记录一个特定的错误日志,然后重新抛出”模式,如下例所示:
try {
return callExternalService(k);
}
catch (RuntimeException error) {
//make a record of the error
log("uh oh, falling back, service failed for key " + k);
throw error;
}
复制代码
doOnError运算符以及所有以doOn为前缀的操作符具有一定的“副作用(side-effect)”。它们允许在不修改序列的情况下查看序列的事件。
与前面显示的命令式示例一样,以下示例仍然传播错误,但确保我们至少记录了外部服务发生故障:
LongAdder failureStat = new LongAdder();
Flux flux =
Flux.just("unknown")
.flatMap(k -> callExternalService(k)
.doOnError(e -> {
failureStat.increment();
log("uh oh, falling back, service failed for key " + k); // 记录日志
})
);
复制代码
使用 Resources 和 Finally 块 (Using Resources and the Finally Block)
- 使用 finally 块清理资源
Stats stats = new Stats();
stats.startTimer();
try {
doSomethingDangerous();
}
finally {
stats.stopTimerAndRecordTiming();
}
复制代码
- 使用 try-with-resource 语法
try (SomeAutoCloseable disposableInstance = new SomeAutoCloseable()) {
return disposableInstance.toString();
}
复制代码
两者都有各自的 Reactor 等价语法:doFinally 和 using。
doFinally 是当序列终止(使用 onComplete 或 onError )或取消时希望执行的副作用。它给你一个提示,什么样的终止触发了副作用。以下示例显示了如何使用 doFinally:
Stats stats = new Stats();
LongAdder statsCancel = new LongAdder();
Flux flux =
Flux.just("foo", "bar")
.doOnSubscribe(s -> stats.startTimer())
.doFinally(type -> {
stats.stopTimerAndRecordTiming();
if (type == SignalType.CANCEL)
statsCancel.increment();
})
.take(1);
复制代码
重试 (Retrying)
Flux.interval(Duration.ofMillis(250))
.map(input -> {
if (input < 3) return "tick " + input;
throw new RuntimeException("boom");
})
.retry(1)
.elapsed()
.subscribe(System.out::println, System.err::println);
[259,tick 0]
[251,tick 1]
[250,tick 2]
[504,tick 0]
[250,tick 1]
[250,tick 2]
java.lang.RuntimeException: boom
复制代码
从前面的例子中可以看出,retry(1)只是重新订阅了interval 一次,从 0 重新开始计时。第二次,由于异常仍然发生,它放弃重试并向下游传播错误。
还有一个更高级的版本的重试 retryWhen,它使用一个“伴生(companion) ”的Flux来判断某个特定的失败是否应该重试。这个伴生的Flux是由操作符创建的,但由用户进行装饰,以便自定义重试条件。
伴生的 Flux 是一个 Flux,它传递了重试的策略/函数,是retryWhen唯一的参数。用户定义该函数并使其返回一个新的Publisher>。Retry类是一个抽象类,我们可以使用**Retry.from(Function)**来生成伴生的Flux。
retryWhen重试周期
- 每次发生错误(有可能会重试)时,一个 RetrySignal 就会被被发布到伴生的Flux中,该Flux已经由您的函数修饰。
- 如果伴生的Flux 发出一个值,就会发生重试。
- 如果伴生的Flux 完成(completes)那么错误会被吞掉,重试周期停止,结果序列也将完成。
- 如果伴生的Flux 产生错误(error),重试周期停止,结果序列也将产生错误(error)。
使用 retryWhen 模拟 retry(3) 的方法,伴生的Flux将吞掉错误,这两种情况之间的区别很重要。
Flux.error( new RuntimeException("boom"))
.doOnError(e -> System.err.println("on error"))
.retryWhen(Retry.from(companion ->
companion.take(3)))
.subscribe(System.out::println, System.err::println);
on error
on error
on error
on error
复制代码
实际上,上面的这个示例将会产生一个空的Flux,但是它成功的完成了。而retry(3)将会以一个错误终止,所以他们的结果并不完全相同。
Flux.error( new RuntimeException("boom"))
.doOnError(e -> System.err.println("on error"))
.retry(3)
.subscribe(System.out::println, System.err::println);
on error
on error
on error
on error
java.lang.RuntimeException: boom
复制代码
指数退避重试
将反应式编程应用于已有的程序
从阻塞式到反应式
public class PersonDao {
/**
* 阻塞式的
* @return
*/
public List listPeople() {
return query("select * from people");
}
/**
* 反应式的
* @return
*/
public Flux rxListPeople() {
return Flux.fromIterable(query("select * from people"));
}
private List query(String sql) {
List people = new ArrayList<>();
for (int i = 0; i < 3; i++) {
Person person = new Person();
person.setId(i);
people.add(person);
}
return people;
}
}
复制代码
我们对已有的阻塞式 API 变更到了 反应式 API 。根据系统规模的不同,和原有系统的兼容性可能是一个大问题。接下来我们通过 buffer 和 **blockLast ** 将反应式代码和阻塞式代码组合起来。
@Test
public void listPeopleTest() {
// 没有任何副作用
Flux peopleFlux = personDao.rxListPeople();
Flux<List> listFlux = peopleFlux.buffer()
.log();
List people = listFlux.blockLast(Duration.ofSeconds(3));
assert people != null;
people.forEach(person -> log.info(person.toString()));
}
11:21:50.598 [main] INFO reactor.Flux.Buffer.1 - onSubscribe(FluxBuffer.BufferExactSubscriber)
11:21:50.601 [main] INFO reactor.Flux.Buffer.1 - request(unbounded)
11:21:50.601 [main] INFO reactor.Flux.Buffer.1 - onNext([Person(id=0), Person(id=1), Person(id=2)])
11:21:50.603 [main] INFO reactor.Flux.Buffer.1 - cancel()
11:21:50.608 [main] INFO reactor.Flux.Buffer.1 - onComplete()
11:21:50.616 [main] INFO wangxw.flux.Block2FluxTest - Person(id=0)
11:21:50.616 [main] INFO wangxw.flux.Block2FluxTest - Person(id=1)
11:21:50.616 [main] INFO wangxw.flux.Block2FluxTest - Person(id=2)
复制代码
blockLast 阻塞等待onComplete 回调完成,你可能认为以上代码只是封装和拆封 Flux,而没有特别明确的目的。但是,这只是第一步。下一个转换将会引入一些延迟执行的功能。仅仅存在 Flux 并不意味着会有后台任务或副作用,这与Future不同,Future几乎总是意味着有某些并发操作在执行(只有任务提交了才能返回一个Future)。
拥抱延迟执行
public Flux rxListPeople() {
return Flux.defer(() ->
Flux.fromIterable(query("select * from people")));
}
复制代码
在订阅之前什么都不会发生。
从命令式并发到声明式并发
在企业级应用程序中,显式的并发并不常见。大多数情况下,每个请求都会由单个线程处理。同一个线程要完成如下工作。
- 接收 TCP/IP 连接
- 解析 HTTP 请求
- 调用 Controller 或 Servlet
- 阻塞对数据库的调用
- 处理结果
- 编码响应(如 JSON 格式)
- 将响应发送至客户端
如果后端要发起多个独立的请求,比如访问数据库,那么这种分层的模型会影响用户的延迟,因为它们是序列化执行的(当然可以很容易的并发执行)。除此之外,扩展性也会受到影响。例如,在Tomcat的执行器(executor)中,默认有200个负责处理请求的线程,这意味着处理的并发连接不能超过200个。如果流量突然暴增,传入的连接将会排队,服务器就会出现更高的延迟。但是,这种情况不会持续下去,Tomcat最终会开始拒绝传入的流量。
**传统的架构,在一个线程中执行请求处理的各个步骤也有一些益处,比如能够提升缓存的本地化以及减少同步的损耗。**令人遗憾的是,**在典型的应用程序中,因为整体的延迟是每层延迟的总和,所以一个有故障的组件可能会对整体的延迟产生负面影响。**此外,有时许多步骤是相互独立的,可以并发执行。例如,调用多个外部API或执行多个独立的SQL查询。例如,以下是一个没有任何并发功能的程序。
@Slf4j
public class TicketService {
/**
* 查询航班
*
* @param flightNo
* @return
*/
public Flight lookupFlight(String flightNo) {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
return new Flight(flightNo);
}
/**
* 查询乘客
*
* @param id
* @return
*/
public Passenger findPassenger(Long id) {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
return new Passenger(id);
}
/**
* 根据航班和乘客订票
*
* @param flight
* @param passenger
* @return
*/
public Ticket bookTicket(Flight flight, Passenger passenger) {
return new Ticket(flight, passenger);
}
/**
* 发送邮件
*
* @param ticket
* @return
*/
public boolean sendEmail(Ticket ticket) throws IOException {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("send email {}", ticket);
return true;
}
}
复制代码
客户端代码如下所示。
@Test
public void blockBookTicket() throws IOException {
Flight flight = ticketService.lookupFlight("LOT 783");
Passenger passenger = ticketService.findPassenger(1L);
Ticket ticket = ticketService.bookTicket(flight, passenger);
ticketService.sendEmail(ticket);
}
复制代码
这是非常典型的阻塞式代码,与众多应用程序中的代码都很类似。但是,如果从延迟的角度来看,上述的代码片段可以分为4个步骤。前两个步骤相互独立,只有第三个步骤(bookTicket())需要 lookupFlight() 和findPassenger() 的返回值。这里显然有机会利用并发的优势,但是,开发人员很少采用这种方式,因为这需要比较复杂的线程池、Future以及回调。但是,如果API已经兼容Rx了,你可以简单地将遗留的阻塞式代码包装到 Flux 中, 如下所示。
public Mono rxLookupFlight(String flightNo) {
return Mono.defer(() ->
Mono.just(lookupFlight(flightNo)));
}
public Mono rxFindPassenger(Long id) {
return Mono.defer(() ->
Mono.just(findPassenger(id)));
}
复制代码
从语义上讲,rx-方法其实以相同的方式完成了相同的任务,换言之,它们默认都是阻塞的。从客户端的角度看,除了API更加冗长之外,我们其实没有得到任何好处。
@Test
public void rxBookTicket() throws IOException {
// 只是占位,不产生任何的副作用
Mono flight = ticketService.rxLookupFlight("LOT 783");
Mono passenger = ticketService.rxFindPassenger(1L);
Mono ticket = flight.zipWith(passenger,
(f, p) -> ticketService.bookTicket(f, p));
ticket.subscribe(ticketService::sendEmail);
}
复制代码
无论是传统的阻塞程序,还是使用 Rx 的程序,它们的运行方式完全相同。不过上述代码的执行方式是延迟执行的,我们得到了两个 Flight 和 Passenger 类型的两个占位符,但是还没有产生任何的副作用。此时,并没有执行任何的数据库查询或Web服务调用。
在上述代码中你需要关注 subscribe() 位于何处。不过在通常情况下,你的业务逻辑只是一直组合Flux,并将它们返回给某个框架或者脚手架层。实际的订阅是由Web框架或某些胶水代码在幕后完成的。自行调用 subscribe() 也算不上糟糕的实践,但是将订阅推迟得越远越好。
为了理解执行的流程,从下往上观察是一种非常有帮助的方法。我们订阅了ticket,因此Rx必须透明地订阅flight和passenger。此时,真正的业务逻辑才会执行。因为两个Flux 都是cold类型的,并且没有涉及并发,所以对flight的订阅会在调用线程中触发lookupFlight()阻塞方法。当lookupFlight()完成的时候,RxJava就可以订阅passenger了。此时,它已经通过同步的flight接收到Flight实例。rxFindPassenger()会以阻塞的方式调用findPassenger()并接收一个Passenger。经过这个连接点之后,数据会往下游流动。Flight和Passenger实例通过提供的lambda表达式(bookTicket)被结合起来,传递给ticket.subscribe()。
这里看上去有不少工作要做,并且运行方式本质上和开始的阻塞式代码并没有区别。但是,现在我们不需要修改任何逻辑就能声明式地应用并发了。
如果业务方法返回Future(或者CompletableFuture,没有本质区别),其实系统已经为我们做出了两个决策。
- 底层对 lookupFlight() 的调用已经开始,这里没有任何延迟执行的空间。我们不会在这个方法上阻塞,但是工作已经启动。
- 我们对并发没有任何控制权。方法的具体实现决定了Future任务是在线程池调用,还是为每个请求建立一个新的线程。
Reactor 给了用户更多的控制权。实际上Flux 一般都已经是异步的了,但是在个别情况下,还是需要为已有的Flux 添加并发功能。在遇到同步Flux 时,可以自由决定使用何种线程机制的是API的消费者,而不是API的实现者。上述功能都是通过 subscribeOn() 操作符实现的,如下所示。
Mono flight = ticketService.rxLookupFlight("LOT 783")
.subscribeOn(Schedulers.boundedElastic());
Mono passenger = ticketService.rxFindPassenger(1L)
.subscribeOn(Schedulers.boundedElastic())
.timeout(Duration.ofSeconds(3)); // 还可以声明一个超时
复制代码
如果 API 是Reactor驱动的,我们可以在订阅之前的任何地方插入 subscribeOn() 操作符,并提供一个所谓的Scheduler实例,不用花费太大的力气就能让两个方法并发执行。
但是bookTicket( )依然有点美中不足,它返回的是Ticket,这毫无疑问是阻塞式的。尽管订票的执行过程可能会非常迅速,但是将其按照 Reactor 的方式进行声明也是值得的,这样会让API更易于演化。
public Mono rxBookTicket(Flight flight, Passenger passenger) {
return Mono.defer(() ->
Mono.just(bookTicket(flight, passenger)));
}
复制代码
但是,现在zipWith()返回的是一个看上去很诡异的Mono
Mono<Mono> ticket = flight
.zipWith(passenger, (f, p) -> ticketService.rxBookTicket(f, p));
复制代码
我们可以使用flatMap,并为其传递一个恒等式函数,如下:
Mono ticket = flight
.zipWith(passenger, (f, p) -> ticketService.rxBookTicket(f, p))
.flatMap(abs -> abs);
复制代码
我们难免会认为subscribeOn()是在 Reactor 中实现并发的恰当工具。这个操作符的确能够实现这一点,但尽量还是不要使用它(以及后文描述的publishOn() )。在现实中,Flux 来源于异步源,所以根本就没有必要进行自定义的调度。这里使用 subscribeOn() 只是为了展现如何升级已有的应用程序,从而有选择性地使用反应式原则。但是,在实践中,Scheduler和subscribeOn()应该是最后的“武器”。
使用 flatMap()
在上述样例中,我们必须通过电子邮件发送一个 Ticket 列表,在这里我们需要注意以下三点:
- 这个列表可能很长。
- 发送一封邮件需要几毫秒,甚至几秒。
- 发送失败的时候,应用程序需要平稳运行,但是最后要报告哪些 ticket 没有投递成功。
最后一项需求迅速排除了简单的tickets.forEach(this::sendEmail)方式,因为这种方式会立即抛出异常,并且不会继续投递ticket。所以你只能使用迭代器,大致代码如下:
List faitures = new ArrayList<>();
for (Ticket ticket : tickets) {
try {
ticketService.sendEmail(ticket);
} catch (Exception e) {
log.warn("Failed to send {}", ticket, e);
faitures.add(ticket);
}
}
复制代码
但是,前面两个需求并没有得到解决。我们不需要在一个线程中串行的发送邮件,按照传统的方式我们可以使用ExecutorService 将每个电子邮件提交为一个独立的任务,代码如下:
List<Pair<Ticket, Future>> tasks = tickets.stream()
.map(ticket -> Pair.of(ticket, ticketService.sendEmailAsync(ticket)))
.collect(Collectors.toList());
List failures = tasks.stream().flatMap(pair -> {
try {
Future future = pair.getRight();
future.get(1, TimeUnit.SECONDS); // 1s 的发送时间
return Stream.empty();
} catch (Exception e) {
Ticket ticket = pair.getLeft();
log.warn("Failed to send {}", ticket, e);
return Stream.of(ticket);
}
}).collect(Collectors.toList());
//-----------------------------------------------------------
ExecutorService pool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
public Future sendEmailAsync(Ticket ticket) {
return pool.submit(() -> sendEmail(ticket));
}
复制代码
首先,我们遍历tickets,并将它们提交到一个线程池中。准确地说,我们调用sendEmailAsync()辅助方法,将对sendEmail()的调用提交到一个线程池中,执行过程会包装到一个Callable中。更准确地说,Callable的实例被先放到线程池前面的一个无界(默认情况下)队列中。如果任务提交的速度太快,它们将无法及时得到处理,进而在线程池的无界队列中积压。这里缺乏一种减缓提交速度的机制,这也是反应式流和回压致力于解决的问题。
为了在发送邮件失败时能够重试,所以必须跟踪哪个Future负责哪个Ticket,这里以Pair来表示。第二个循环遍历所有的Future,并试图通过阻塞(get())和等待完成的方式来解除对它们的引用。如果get()成功返回,将会跳过这个Ticket。但是,如果出现异常,将会返回与该任务关联的Ticket实例,这样就能知道它失败了,稍后再报告它。Stream.flatMap()允许返回零个或一个元素(实际上可以是任意数量),而Stream.map()通常需要一个元素。
你可能想问,为何需要两个循环而不是如下的一个循环呢?
tickets.stream()
.map(ticket -> Pair.of(ticket, ticketService.sendEmailAsync(ticket)))
.flatMap(pair -> {
// ...
}).collect(Collectors.toList());
复制代码
如果你不理解Java 8的Stream是如何运行的,就很难发现这里有一个非常有意思的bug。因为流与 Flux 类似,它们都是延迟执行的,所以只有在请求终端操作(terminal operation)的时候(例如collect(toList())),才会针对底层集合中的每个元素依次执行操作。这意味着启动后台任务的map()操作并没有针对所有ticket立即执行,而是每次只执行一个元素,交替使用flatMap()。除此之外,我们实际上启动了一个Future,阻塞等待,然后启动第二个Future,阻塞等待,以此类推。
上述代码涉及很多工作,并且出现错误的可能性非常高,更不要提什么可读性和可维护性了。那么 Reactor 能在这个场景中发挥什么作用那? 首先,我们将发送邮件的 API 更新为使用 Reactor,如下所示:
public Mono rxSendEmail(Ticket ticket){
// 将阻塞的源包装为反应式
return Mono.fromCallable(() -> sendEmail(ticket));
}
复制代码
这里还没有涉及并发,它只是将sendEmail()包装进了一个 Mono 中。然后可以像前面一样遍历所有的tickets,它并不涉及任何并发,如下所示:
List failures = Flux.fromIterable(this.tickets)
.flatMap(ticket ->
ticketService.rxSendEmail(ticket)
// .flatMap(response -> Mono.empty())
.ignoreElement()
.ofType(Ticket.class)
.doOnError(e -> log.warn("Failed to send {}", ticket, e))
.onErrorReturn(ticket)
.buffer()
.blockLast(); // 收集发送失败的tickets
复制代码
在以上的样例中,很容易看到内层的flatMap()忽略了response并返回了一个空的流。在这样的场景中,flatMap()就有点大材小用了,更有效的方式是ignoreElements()。ignoreElements()会忽略上游发布的值,只转发onCompleted()或onError()通知。因为我们忽略实际的响应,只处理错误,所以这里的ignoreElements()能够运行得非常好。
我们感兴趣的内容都在外层flatMap()中。如果只是使用flatMap(this::rxSendEmail),代码也可以运行,只不过rxSendEmail引发的任何故障都会终结整个流。但是,我们想要“捕获”所有发布出来的错误,将其收集起来供后续使用。我们使用了与Stream.flatMap()类似的技巧:如果response能够成功发布,就将其转换为一个空的 Flux。它的基本含义就是丢弃成功的ticket。但是,如果遇到故障,样例会返回引发故障的ticket。额外的doOnError()回调允许将异常以日志的形式记录下来。当然,也可以将日志记录添加到onErrorReturn()操作符中,但是我们发现这种关注点分离的方式更符合函数式的风格。
需要注意,如果将外层的flatMap()替换为concatMap(),我们将会遇到与前文提及的JDK中的Stream类似的bug。flatMap(或merge)会立即订阅所有的内部流。与之相反,concatMap(或concat)则会依次订阅每个内部Flux。并且只要没有人真正订阅Flux,它就不会开展任何工作。
到目前为止,一个带有try-catch的for循环被替换成了更难阅读、更复杂的 Flux。但是,为了将序列化代码转换为多线程计算,我们只需要再加一个操作符,**通过将外层 flatMap 内嵌的 Flux 使用 subscribeOn() 这个操作符使 Mono 采用异步运行的方式。**如下所示。
List failures = Flux.fromIterable(this.tickets)
.flatMap(ticket ->
ticketService.rxSendEmail(ticket)
// .flatMap(response -> Mono.empty())
.ignoreElement()
.ofType(Ticket.class)
.doOnError(e -> log.warn("Failed to send {}", ticket, e))
.onErrorReturn(ticket)
.subscribeOn(Schedulers.boundedElastic())) // 内嵌的flux执行异步操作
.buffer()
.blockLast(); // 收集发送失败的tickets
复制代码
它没有太多的侵入性,你甚至可能很难发现它的存在。额外的 subscribeOn() 操作符会让每个单独的rxSendMail()都在一个特定的Scheduler中执行,这是 Reactor 的优势之一。
在线程方面,Flux 默认同步执行,但是它能够实现无缝甚至透明的多线程功能。当然,这并不意味着我们可以在任意位置安全地注入Scheduler。只不过,它的API更加简洁,抽象层级也更高。我们只需要记住 Flux 默认是同步的。但是,我们可以很轻松地改变这种行为,将并发功能用到往常我们认为不可能出现的地方。这对于现存的遗留应用程序很有价值,借助这种功能,可以轻松地对其进行优化。
subscribeOn() 放在外部还是内部也是值得讨论的,此外应该尽可能推迟对Flux的订阅,一般这会发生在外部世界的Web框架附近。这会大大改变你的思维方式,因为整个业务逻辑都是延迟执行的,直到有人真正想要看到结果的时候才会运行。