Hystrix 1.5 滑动窗口实现原理总结

总览

Netflix Hystrix 通过类似滑动窗口的数据结构来统计调用的指标数据。Hystrix 1.5 将滑动窗口设计成了数据流(reactive stream, RxJava 中的 Observable)的形式。通过消费数据流的形式利用滑动窗口,并对数据流进行变换后进行后续的操作,可以让开发者更加灵活地去使用。由于 Hystrix 里大量使用了 RxJava,再加上滑动窗口本质就是不断变换的数据流,滑动窗口中每个桶的数据都来自于源源不断的事件,因此滑动窗口非常适合用观察者模式和响应式编程思想的 RxJava 实现。使用 RxJava 实现有一大好处:可以通过 RxJava 的一系列操作符来实现滑动窗口,从而可以依赖 RxJava 的线程模型来保证数据写入和聚合的线程安全,将这一系列的机制交给 RxJava。所有的操作都是在 RxJava 的后台线程上进行的,RxJava 会保证操作的有序性和线程安全(参见 The Observable Contract)。

这里我们就以 Hystrix 熔断器依赖的记录调用情况统计的 HealthCountsStream 为例来看一下 Hystrix 1.5 是如何利用 RxJava 将滑动窗口抽象并实现成 reactive stream 的,以及如何去消费对应的数据流。

滑动窗口的实现都位于 com.netflix.hystrix.metric.consumer 包下,这里只挑 BucketedRollingCounterStream 这条线的实现来分析。首先先看一下类的继承结构:

Class hierarchy of BucketedRollingCounterStream

最顶层的 BucketedCounterStream 抽象类提供了基本的桶计数器实现,按配置的时间间隔将所有事件聚合成桶;BucketedRollingCounterStream 抽象类在其基础上实现滑动窗口,并聚合成指标数据;而最底下一层的类则是各种具体的实现,比如 HealthCountsStream 最终会聚合成健康检查数据(HystrixCommandMetrics.HealthCounts,统计调用成功和失败的计数),供 HystrixCircuitBreaker 使用。

BucketedCounterStream

总览

BucketedCounterStream 抽象类提供了基本的桶计数器实现。用户在使用 Hystrix 的时候一般都要配两个值:timeInMillisecondsnumBuckets,前者代表滑动窗口的长度(时间间隔),后者代表滑动窗口中桶的个数,那么每个桶对应的窗口长度就是 bucketSizeInMs = timeInMilliseconds / numBuckets(记为一个单元窗口周期)。BucketedCounterStream 每隔一个单元窗口周期(bucketSizeInMs)就把这段时间内的所有调用事件聚合到一个桶内。我们来看一下它的实现,首先来看一下它的泛型定义:

1
2
3
public abstract class BucketedCounterStream<Event extends HystrixEvent, Bucket, Output> {
// ...
}

BucketedCounterStream 的泛型里接受三个类型参数,其中第一个 Event 类型代表 Hystrix 中的调用事件,如命令开始执行、命令执行完成等。这种事件驱动的设计也非常符合 RxJava 的思想,每个调用者都向订阅者发布事件,订阅者将事件聚合成调用指标;第二个 Bucket 类型代表桶的类型,第三个 Output 类型代表数据聚合的最终输出类型。

BucketedCounterStream 核心代码在构造函数里(为了可读性起见,将所有可以用 lambda expression 的地方都转换成了 lambda expression,下同):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
protected final int numBuckets;
protected final Observable<Bucket> bucketedStream;
private final Func1<Observable<Event>, Observable<Bucket>> reduceBucketToSummary;
protected BucketedCounterStream(final HystrixEventStream<Event> inputEventStream, final int numBuckets, final int bucketSizeInMs,
final Func2<Bucket, Event, Bucket> appendRawEventToBucket) {
this.numBuckets = numBuckets;
this.reduceBucketToSummary = eventBucket -> eventBucket.reduce(getEmptyBucketSummary(), appendRawEventToBucket);
final List<Bucket> emptyEventCountsToStart = new ArrayList<>();
for (int i = 0; i < numBuckets; i++) {
emptyEventCountsToStart.add(getEmptyBucketSummary());
}
this.bucketedStream = Observable.defer(() -> {
return inputEventStream
.observe()
.window(bucketSizeInMs, TimeUnit.MILLISECONDS) //bucket it by the counter window so we can emit to the next operator in time chunks, not on every OnNext
.flatMap(reduceBucketToSummary) //for a given bucket, turn it into a long array containing counts of event types
.startWith(emptyEventCountsToStart); //start it with empty arrays to make consumer logic as generic as possible (windows are always full)
});
}

其中 bucketedStream 即为本次得到的数据流(类型为 RxJava 中的 Observable,即观察者模式中的 Publisher,会源源不断地产生事件/数据),里面最核心的逻辑就是如何将一个一个的事件按一段时间聚合成一个桶。我们可以看到 bucketedStream 是经事件源 inputEventStream 变换而成的,事件源的类型为 HystrixEventStream<Event>,它代表事件流接口:

1
2
3
4
public interface HystrixEventStream<E extends HystrixEvent> {
Observable<E> observe();
}

其中 observe 方法返回这个事件流对应的发布者 Observable,订阅者可以对事件进行变换并消费。

事件流的写入

Hystrix 中执行函数以命令模式封装成了一个一个命令(Command),每个命令执行时都会触发某个事件,其中命令执行完成事件(HystrixCommandCompletion)是 Hystrix 中最核心的事件,它可以代表某个命令执行成功、超时、异常等等的各种的状态,与服务调用的熔断息息相关。熔断器的计数依赖于 HystrixCommandCompletion 事件,因此这里我们只关注这个事件对应的事件流,其余类型的事件流原理类似。

那么这个事件流中的事件是从哪里发布的呢?我们来看一下相关的具体实现 - HystrixCommandCompletionStream(仅核心代码):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class HystrixCommandCompletionStream implements HystrixEventStream<HystrixCommandCompletion> {
private final HystrixCommandKey commandKey;
private final Subject<HystrixCommandCompletion, HystrixCommandCompletion> writeOnlySubject;
private final Observable<HystrixCommandCompletion> readOnlyStream;
private static final ConcurrentMap<String, HystrixCommandCompletionStream> streams = new ConcurrentHashMap<String, HystrixCommandCompletionStream>();
public static HystrixCommandCompletionStream getInstance(HystrixCommandKey commandKey) {
// 此段代码略,大致逻辑为:若对应的 CommandKey 的事件流已创建就从缓存中取出,否则就新创建并缓存起来,保证每个 CommandKey 只有一个实例
}
HystrixCommandCompletionStream(final HystrixCommandKey commandKey) {
this.commandKey = commandKey;
this.writeOnlySubject = new SerializedSubject<HystrixCommandCompletion, HystrixCommandCompletion>(PublishSubject.<HystrixCommandCompletion>create());
this.readOnlyStream = writeOnlySubject.share();
}
public static void reset() {
streams.clear();
}
public void write(HystrixCommandCompletion event) {
writeOnlySubject.onNext(event);
}
@Override
public Observable<HystrixCommandCompletion> observe() {
return readOnlyStream;
}
}

从代码里我们可以看到 write 方法里通过向某个 Subject 发布事件来实现了发布的逻辑,那么 Subject 又是什么呢?简单来说,Subject 就像是一个桥梁,既可以作为发布者 Observable,又可以作为订阅者 Observer。它可以作为发布者和订阅者之间的一个“代理”,提供额外的功能(如流量控制、缓存等)。这里的 writeOnlySubject 是经过 SerializedSubject 封装的 PublishSubjectPublishSubject 可以看做 hot observable。为了保证调用的顺序(根据 The Observable Contract,每个事件的产生需要满足顺序上的偏序关系,即使是在不同线程产生),需要用 SerializedSubject 封装一层来保证事件真正地串行地产生。这里还有一个问题,就是不同的发布者调用 write 方法发布事件时,线程上下文可能都不同,那么如何保证其线程安全呢?Hystrix 1.5 通过使用 ThreadLocal 来保证每个线程都有一份 Subject 的实例,确保事件发布的线程安全。相关代码位于 HystrixThreadEventStream 内(已略去其它事件的代码):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
public class HystrixThreadEventStream {
private final long threadId;
private final String threadName;
private final Subject<HystrixCommandExecutionStarted, HystrixCommandExecutionStarted> writeOnlyCommandStartSubject;
private final Subject<HystrixCommandCompletion, HystrixCommandCompletion> writeOnlyCommandCompletionSubject;
private final Subject<HystrixCollapserEvent, HystrixCollapserEvent> writeOnlyCollapserSubject;
private static final ThreadLocal<HystrixThreadEventStream> threadLocalStreams = new ThreadLocal<HystrixThreadEventStream>() {
@Override
protected HystrixThreadEventStream initialValue() {
return new HystrixThreadEventStream(Thread.currentThread());
}
};
private static final Action1<HystrixCommandCompletion> writeCommandCompletionsToShardedStreams = commandCompletion -> {
HystrixCommandCompletionStream commandStream = HystrixCommandCompletionStream.getInstance(commandCompletion.getCommandKey());
commandStream.write(commandCompletion);
if (commandCompletion.isExecutedInThread() || commandCompletion.isResponseThreadPoolRejected()) {
HystrixThreadPoolCompletionStream threadPoolStream = HystrixThreadPoolCompletionStream.getInstance(commandCompletion.getThreadPoolKey());
threadPoolStream.write(commandCompletion);
}
};
/* package */ HystrixThreadEventStream(Thread thread) {
this.threadId = thread.getId();
this.threadName = thread.getName();
writeOnlyCommandCompletionSubject = PublishSubject.create();
writeOnlyCommandCompletionSubject
.onBackpressureBuffer()
.doOnNext(writeCommandCompletionsToShardedStreams)
.unsafeSubscribe(Subscribers.empty());
}
public static HystrixThreadEventStream getInstance() {
return threadLocalStreams.get();
}
public void shutdown() {
writeOnlyCommandStartSubject.onCompleted();
writeOnlyCommandCompletionSubject.onCompleted();
writeOnlyCollapserSubject.onCompleted();
}
// 执行完毕/异常/超时都会调用此方法
public void executionDone(ExecutionResult executionResult, HystrixCommandKey commandKey, HystrixThreadPoolKey threadPoolKey) {
HystrixCommandCompletion event = HystrixCommandCompletion.from(executionResult, commandKey, threadPoolKey);
writeOnlyCommandCompletionSubject.onNext(event);
}
}

这里 Hystrix 通过 ThreadLocal 为每个不同的线程都创建了不同的 HystrixThreadEventStream,里面的 Subject 都是 write-only, thread-safe 的。Hystrix 在这里额外加了一层 writeOnlyCommandCompletionSubject,提供额外的流量控制机制(onBackpressureBuffer),消费者太慢时这里会积压。其中会调用 HystrixCommandCompletionStreamwrite 方法产生对应的事件。

executionDone 方法最后会经 HystrixCommandMetrics 类的 markCommandDone 方法进行调用。HystrixCommandMetrics 是 Hystrix 中另一个重要的类,从中可以获取各种指标数据的流的实例。最后 Hystrix 会在对应命令执行完毕后,调用 markCommandDone 进行数据记录,并发布对应的事件。相关代码位于 AbstractCommand 类内:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private void handleCommandEnd(boolean commandExecutionStarted) {
Reference<TimerListener> tl = timeoutTimer.get();
if (tl != null) {
tl.clear();
}
long userThreadLatency = System.currentTimeMillis() - commandStartTimestamp;
executionResult = executionResult.markUserThreadCompletion((int) userThreadLatency);
if (executionResultAtTimeOfCancellation == null) {
metrics.markCommandDone(executionResult, commandKey, threadPoolKey, commandExecutionStarted);
} else {
metrics.markCommandDone(executionResultAtTimeOfCancellation, commandKey, threadPoolKey, commandExecutionStarted);
}
if (endCurrentThreadExecutingCommand != null) {
endCurrentThreadExecutingCommand.call();
}
}

AbstractCommand 类是 Hystrix 命令模式执行模型的实现,整合了资源隔离、熔断器等各种高可用机制,是整个 Hystrix 的核心。

共享的事件流

上面我们探究了事件流的发布原理,以及如何保证写的线程安全。那么事件流写入到 writeOnlySubject 以后,如何被订阅者消费呢?如何保证多个订阅者都可以对事件流进行消费,并且序列一致呢?我们回到之前的 observe 方法,observe 方法返回的是一个 readOnlyStream

1
2
3
4
@Override
public Observable<HystrixCommandCompletion> observe() {
return readOnlyStream;
}

readOnlyStreamwriteOnlySubject 的只读版本,它是通过 share 操作符产生的:

1
this.readOnlyStream = writeOnlySubject.share();

Hystrix 通过 RxJava 的 share 操作符产生一种特殊的 Observable:当有一个订阅者去消费事件流时它就开始产生事件,可以有多个订阅者去订阅,同一时刻收到的事件是一致的;直到最后一个订阅者取消订阅以后,事件流才停止产生事件。其底层实现非常有意思:

1
2
3
public final Observable<T> share() {
return publish().refCount();
}

在执行 publish 的时候,Observable 会被变换成为一个 ConnectableObservable。这种 ConnectableObservable 只会在进行连接操作(connect)以后才会产生数据(连接后行为类似于 hot observable)。而 share 操作底层的 refCount 操作符就帮我们做了这样的操作:refCount 底层维护着一个引用计数器,代表绑定的订阅者数目。当第一个订阅者去消费事件流的时候,引用计数大于 0,refCount 底层会自动进行 connect,从而触发事件流产生事件;当最后一个订阅者取消订阅以后,引用计数归零,refCount 底层就会自动进行 disconnect,事件流停止产生事件。也就是说,这样的一个可以被多个订阅者共享的事件流,底层是基于引用计数法来管理事件的产生的,和智能指针的思想类似。

Reactive stream of refCount

事件流聚合为桶

上面我们研究完了事件流是如何产生的,接下来就回归到事件流聚合为桶的逻辑:

1
2
3
4
5
6
7
this.bucketedStream = Observable.defer(() -> { // defer 的意思是 lazy 创建
return inputEventStream
.observe()
.window(bucketSizeInMs, TimeUnit.MILLISECONDS) // 按单元窗口长度来将某个时间段内的调用事件聚集起来
.flatMap(reduceBucketToSummary) // 将每个单元窗口内聚集起来的事件集合聚合成桶
.startWith(emptyEventCountsToStart); // 为了保证窗口的完整性,开始的时候先产生一串空的桶
});

其中的核心是 window 操作符,它可以按单元窗口长度来将某个时间段内的调用事件聚集起来,此时数据流里每个对象都是一个集合:Observable<Event>,所以需要将其聚集成桶类型以将其扁平化。Hystrix 通过 RxJava 的 reduce 操作符进行“归纳”操作,将一串事件归纳成一个桶:

1
this.reduceBucketToSummary = eventBucket -> eventBucket.reduce(getEmptyBucketSummary(), appendRawEventToBucket);

其中我们需要提供桶的初值(即空桶),并要提供聚合函数来进行聚合,类型为 Bucket -> Event -> Bucket(代表对于每个 Event,都将其聚合到 Bucket 中,并返回聚合后的 Bucket)。不同的实现对应的 Bucket 和规约函数不同,比如熔断器依赖的 HealthCountsStream 就以 long[] 来作为每个桶。

注:此处的 window(timespan, unit) 操作符属于计算型操作符,默认会在 Schedulers.computation() 调度器下执行(CPU 密集型),其底层本质是线程数为 CPU 核数的线程池。RxJava 会确保其线程安全。

BucketedRollingCounterStream

BucketedRollingCounterStream 按照滑动窗口的大小对每个单元窗口产生的桶进行聚合,这也是 Hystrix 1.5 中滑动窗口的抽象实现。其核心实现仍然位于构造函数内:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public abstract class BucketedRollingCounterStream<Event extends HystrixEvent, Bucket, Output> extends BucketedCounterStream<Event, Bucket, Output> {
private Observable<Output> sourceStream;
private final AtomicBoolean isSourceCurrentlySubscribed = new AtomicBoolean(false);
protected BucketedRollingCounterStream(HystrixEventStream<Event> stream, final int numBuckets, int bucketSizeInMs,
final Func2<Bucket, Event, Bucket> appendRawEventToBucket,
final Func2<Output, Bucket, Output> reduceBucket) {
super(stream, numBuckets, bucketSizeInMs, appendRawEventToBucket);
Func1<Observable<Bucket>, Observable<Output>> reduceWindowToSummary = window -> window.scan(getEmptyOutputValue(), reduceBucket).skip(numBuckets);
this.sourceStream = bucketedStream // 数据流,每个对象代表单元窗口产生的桶
.window(numBuckets, 1) // 按照滑动窗口桶的个数进行桶的聚集
.flatMap(reduceWindowToSummary) // 将一系列的桶聚集成最后的数据对象
.doOnSubscribe(() -> isSourceCurrentlySubscribed.set(true))
.doOnUnsubscribe(() -> isSourceCurrentlySubscribed.set(false))
.share() // 不同的订阅者看到的数据是一致的
.onBackpressureDrop(); // 流量控制,当消费者消费速度过慢时就丢弃数据,不进行积压
}
@Override
public Observable<Output> observe() {
return sourceStream;
}
/* package-private */ boolean isSourceCurrentlySubscribed() {
return isSourceCurrentlySubscribed.get();
}
}

构造函数后两个参数参数分别代表两个函数:将事件流聚合成桶的函数(appendRawEventToBucket) 以及 将桶聚合成输出对象的函数(reduceBucket)。

我们看到 BucketedRollingCounterStream 实现了 observe 方法,返回了一个 Observable<Output> 类型的发布者 sourceStream,供订阅者去消费。这里的 sourceStream 应该就是滑动窗口的终极形态了,那么它是如何变换得到的呢?这里面的核心还是 windowflatMap 算子。这里的 window 算子和之前的版本不同,它可以将数据流中的一定数量的数据聚集成一个集合,它的第二个参数 skip=1 的意思就是按照步长为 1 在数据流中滑动,不断聚集对象,这即为滑动窗口的真正实现。到这里每个窗口都已经形成了,下一步就是要对窗口进行聚合了。注意这里聚合操作没有用 reduce,而是用了 scan + skip(numBuckets) 的组合:

1
Func1<Observable<Bucket>, Observable<Output>> reduceWindowToSummary = window -> window.scan(getEmptyOutputValue(), reduceBucket).skip(numBuckets);

这里每个集合的大小都是 numBuckets,看起来用 reducescan + skip(numBuckets) 没有什么区别,但是注意当数据流终结时,最后面的窗口大小都不满 numBuckets,这时候就需要把这些不完整的窗口给过滤掉来确保数据不缺失。这个地方也是开发的时候容易忽略的地方,很值得思考。

聚合完毕以后,基本的滑动窗口数据就OK了,为了支持多订阅者,还要进行 share;并且利用 onBackpressureDrop 操作符实现流量控制,此处当消费者跟不上的时候就直接丢掉数据,不进行积压。

Rolling status

HealthCountsStream

前面滑动窗口的抽象实现都已经分析完了,现在我们就来看一下其中的一个具体实现 - HealthCountsStream,它提供实时的健康检查数据(HystrixCommandMetrics.HealthCounts,统计调用成功和失败的计数)。

之前我们提到 BucketedRollingCounterStream 里面有三个类型参数和两个重要函数参数。HealthCountsStream 对应的三个类型参数分别为:

  • Event: HystrixCommandCompletion,代表命令执行完成。可以从中获取执行结果,并从中提取所有产生的事件(HystrixEventType
  • Bucket: 桶的类型为 long[],里面统计了各种事件的个数。其中 index 为事件类型枚举对应的索引(ordinal),值为对应事件的个数
  • Output: HystrixCommandMetrics.HealthCounts,里面统计了总的执行次数、失败次数以及失败百分比,供熔断器使用

滑动窗口里用于将事件聚合成桶的函数实现:

1
2
3
4
5
6
7
8
9
10
11
12
public static final Func2<long[], HystrixCommandCompletion, long[]> appendEventToBucket = (initialCountArray, execution) -> {
ExecutionResult.EventCounts eventCounts = execution.getEventCounts();
for (HystrixEventType eventType: ALL_EVENT_TYPES) {
switch (eventType) {
case EXCEPTION_THROWN: break; //this is just a sum of other anyway - don't do the work here
default:
initialCountArray[eventType.ordinal()] += eventCounts.getCount(eventType);
break;
}
}
return initialCountArray;
};

滑动窗口里用于将每个窗口聚合成最终的统计数据的的函数实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private static final Func2<HystrixCommandMetrics.HealthCounts, long[], HystrixCommandMetrics.HealthCounts> healthCheckAccumulator = HystrixCommandMetrics.HealthCounts::plus;
// 具体的实现,位于 HystrixCommandMetrics.HealthCounts 类内
public HealthCounts plus(long[] eventTypeCounts) {
long updatedTotalCount = totalCount; // 之前的请求总数
long updatedErrorCount = errorCount; // 之前的失败个数
long successCount = eventTypeCounts[HystrixEventType.SUCCESS.ordinal()];
long failureCount = eventTypeCounts[HystrixEventType.FAILURE.ordinal()];
long timeoutCount = eventTypeCounts[HystrixEventType.TIMEOUT.ordinal()];
long threadPoolRejectedCount = eventTypeCounts[HystrixEventType.THREAD_POOL_REJECTED.ordinal()];
long semaphoreRejectedCount = eventTypeCounts[HystrixEventType.SEMAPHORE_REJECTED.ordinal()];
// 加上所有事件的总数
updatedTotalCount += (successCount + failureCount + timeoutCount + threadPoolRejectedCount + semaphoreRejectedCount);
// 加上失败事件的总数(包括请求失败、超时、线程池满、信号量满)
updatedErrorCount += (failureCount + timeoutCount + threadPoolRejectedCount + semaphoreRejectedCount);
return new HealthCounts(updatedTotalCount, updatedErrorCount);
}

Hystrix 熔断器里会实时地去消费每个窗口产生的健康统计数据,并根据指标来决定熔断器的状态:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
/* package */class HystrixCircuitBreakerImpl implements HystrixCircuitBreaker {
private final HystrixCommandProperties properties;
private final HystrixCommandMetrics metrics;
enum Status {
CLOSED, OPEN, HALF_OPEN;
}
private final AtomicReference<Status> status = new AtomicReference<Status>(Status.CLOSED);
private final AtomicLong circuitOpened = new AtomicLong(-1);
private final AtomicReference<Subscription> activeSubscription = new AtomicReference<Subscription>(null);
protected HystrixCircuitBreakerImpl(HystrixCommandKey key, HystrixCommandGroupKey commandGroup, final HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
this.properties = properties;
this.metrics = metrics;
//On a timer, this will set the circuit between OPEN/CLOSED as command executions occur
Subscription s = subscribeToStream();
activeSubscription.set(s);
}
private Subscription subscribeToStream() {
/*
* This stream will recalculate the OPEN/CLOSED status on every onNext from the health stream
*/
return metrics.getHealthCountsStream()
.observe()
.subscribe(new Subscriber<HealthCounts>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(HealthCounts hc) {
// check if we are past the statisticalWindowVolumeThreshold
if (hc.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {
// we are not past the minimum volume threshold for the stat window,
// so no change to circuit status.
// if it was CLOSED, it stays CLOSED
// if it was half-open, we need to wait for a successful command execution
// if it was open, we need to wait for sleep window to elapse
} else {
if (hc.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {
//we are not past the minimum error threshold for the stat window,
// so no change to circuit status.
// if it was CLOSED, it stays CLOSED
// if it was half-open, we need to wait for a successful command execution
// if it was open, we need to wait for sleep window to elapse
} else {
// our failure rate is too high, we need to set the state to OPEN
if (status.compareAndSet(Status.CLOSED, Status.OPEN)) {
circuitOpened.set(System.currentTimeMillis());
}
}
}
}
});
}
}

总结

Hystrix 1.5 使用 RxJava 1.x 来实现滑动窗口,将滑动窗口抽象成响应式数据流的形式,既适合 Hystrix 事件驱动的特点,又易于实现和使用。滑动窗口的实现的要点就是每个桶的聚合以及滑动窗口的形成,Hystrix 巧妙地运用了 RxJava 中的 window 操作符来将单位窗口时间内的事件,以及将一个窗口大小内的桶聚集到一起,并通过 reduce 等折叠操作将事件集合聚集为桶,将滑动窗口内的桶聚集成指标数据,非常巧妙。同时,Hystrix 利用 ThreadLocal 作为一个线程安全的“代理”,可以确保多个发布者写的线程安全;通过 RxJava 的 share 操作符可以确保多个订阅者从某个共享的 Observable 中观察的序列一致。

最后用一张图来总结 Hystrix Metrics 事件驱动的流程:

Event driven flow of Hystrix


References

call/cc 总结 | Scheme

Continuation

Continuation 也是一个老生常谈的东西了,我们来回顾一下。首先我们看一下 TSPL4 中定义的表达式求值需要做的事:

During the evaluation of a Scheme expression, the implementation must keep track of two things: (1) what to evaluate and (2) what to do with the value.

Continuation 即为其中的(2),即表达式被求值以后,接下来要对表达式做的计算R5RS 中 continuation 的定义为:

The continuation represents an entire (default) future for the computation.

比如 (+ (* 2 3) (+ 1 7)) 表达式中,(* 2 3)的 continuation 为:保存 (* 2 3) 计算出的值 6,然后计算 (+ 1 7) 的值,最后将两表达式的值相加,结束;(+ 1 7) 的 continuation 为:保存 (+ 1 7) 的值 8,将其与前面计算出的 6 相加,结束。

Scheme 中的 continuation 是 first-class 的,也就是说它可以被当做参数进行传递和返回;并且 Scheme 中可以将 continuation 视为一个 procedure,也就是说可以调用 continuation 执行后续的运算。

call/cc

每个表达式在求值的时候,都会有一个对应的 current continuation,它在等着当前表达式求值完毕然后把值传递给它。那么如何捕捉 current continuation 呢?这就要用到 Scheme 中强大的 call/cc 了。call/cc 的全名是 call-with-current-continuation ,它可以捕捉当前环境下的 current continuation 并利用它做各种各样的事情,如改变控制流,实现非本地退出(non-local exit)、协程(coroutine)、多任务(multi-tasking)等,非常方便。注意这里的 continuation 将当前 context 一并打包保存起来了,而不只是保存程序运行的位置。下面我们来举几个例子说明一下 call/cc 的用法。

current continuation

我们先来看个最简单的例子 —— 用它来捕捉 current continuation 并作为 procedure 调用。call/cc 接受一个函数,该函数接受一个参数,此参数即为 current continuation。以之前 (+ (* 2 3) (+ 1 7)) 表达式中 (* 2 3) 的 continuation 为例:

1
2
3
4
5
(define cc #f)
(+ (call/cc (lambda (return)
(set! cc return)
(* 2 3)))
(+ 1 7))

我们将 (* 2 3) 的 current continuation (用(+ ? (+ 1 7))表示) 绑定给 cc 变量。现在 cc 就对应了一个 continuation ,它相当于过程 (define (cc x) (+ (x) (+ 1 7))),等待一个值然后进行后续的运算:

1
2
3
4
5
6
> cc
#<continuation>
> (cc 10)
18
> (cc (* 2 3))
14

这个例子很好理解,我们下面引入 call/cc 的本质 —— 控制流变换。在 Scheme 中,假设 call/cc 捕捉到的 current continuation 为 cc(位于 lambda 中),如果 cc 作为过程 直接或间接地被调用(即给它传值),call/cc 会立即返回,返回值即为传入 cc 的值。即一旦 current continuation 被调用,控制流会跳到 call/cc 处。因此,利用 call/cc,我们可以摆脱顺序执行的限制,在程序中跳来跳去,非常灵活。下面我们举几个 non-local exit 的例子来说明。

Non-local exit

Scheme 中没有 breakreturn 关键字,因此在循环中如果想 break 并提前返回的话就得借助 call/cc。比如下面的例子寻找传入的 list 中是否包含 5

1
2
3
4
5
6
7
8
9
10
11
12
(define (do-with element return)
(if (= element 5)
(return 'find-five)
(void)))
(define (check-lst lst)
(call/cc (lambda (return)
(for-each (lambda (element)
(do-with element return)
(printf "~a~%" element))
lst)
'not-found)))

测试:

1
2
3
4
5
6
7
8
9
> (check-lst '(0 2 4))
0
2
4
'not-found
> (check-lst '(0 3 5 1))
0
3
'find-five

check-lst 过程会遍历列表中的元素,每次都会将 current continuation 传给 do-with 过程并进行调用,一旦do-with 遇到 5,我们就将结果传给 current continuation (即 return),此时控制流会马上跳回 check-lst 过程中的 call/cc 处,这时候就已经终止遍历了(跳出了循环)。call/cc 的返回值为 'find-five,所以最后会在控制台上打印出 'find-five

我们再来看一个经典的 generator 的例子,它非常像 Python 和 ES 6 中的 yield,每次调用的时候都会返回 list 中的一个元素:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
(define (generate-one-element-at-a-time lst)
;; Hand the next item from a-list to "return" or an end-of-list marker
(define (control-state return)
(for-each
(lambda (element)
(set! return (call/cc
(lambda (resume-here)
;; Grab the current continuation
(set! control-state resume-here)
(return element)))))
lst)
(return 'you-fell-off-the-end))
;; This is the actual generator, producing one item from a-list at a time
(define (generator)
(call/cc control-state))
;; Return the generator
generator)
(define generate-digit
(generate-one-element-at-a-time '(0 1 2)))

调用:

1
2
3
4
5
6
7
8
> (generate-digit)
0
> (generate-digit)
1
> (generate-digit)
2
> (generate-digit)
'you-fell-off-the-end

注意到这个例子里有两个 call/cc,大家刚看到的时候可能会有点晕,其实这两个 call/cc 各司其职,互不干扰。第一个 call/cc 负责保存遍历的状态(从此处恢复),而 generator 中的 call/cc 才是真正生成值的地方(非本地退出)。其中一个需要注意的地方就是 control-state,它在第一次调用的时候还是个 procedure,在第一次调用的过程中它就被重新绑定成一个 continuation,之后再调用 generator 生成器的时候,控制流就可以跳到之前遍历的位置继续执行下面的过程,从而达到生成器的效果。

阴阳谜题

continuation 环境嵌套。后面有时间专开一篇分析~

1
2
3
4
5
(let* ((yin
((lambda (cc) (display #\@) cc) (call-with-current-continuation (lambda (c) c))))
(yang
((lambda (cc) (display #\*) cc) (call-with-current-continuation (lambda (c) c)))))
(yin yang))

call/cc与数理逻辑

这里简单提一下 call/cc 与类型系统和数理逻辑的联系。call/cc 的类型是 ((P → Q) → P) → P,通过 Curry-Howard 同构,它可以对应到经典逻辑中的 Peirce’s law

$$((P \to Q) \to P) \to P$$

Peirce’s law 代表排中律 $P \land \lnot P$,这条逻辑无法在 λ演算所对应的直觉逻辑中表示(直觉逻辑中双重否定不成立),因此 call/cc 无法用 λ表达式定义。通常我们用扩展后的 $\lambda \mu \ calculus$ 来定义 call/cc,$\lambda \mu \ calculus$ 经 Curry-Howard 同构可以得到经典逻辑。


References

分布式计算系统中的 GC 问题 | Yak (OSDI 2016) 学习笔记

最近一直在关注 OSDI 2016,发现了这篇关于分布式计算系统内存管理的论文:Yak: A High-Performance Big-Data-Friendly. Garbage Collector,感觉比较有趣,拿来总结总结~

Yak 是一个 JVM 平台上的针对大数据场景(分布式计算框架)设计、优化的 Garbage Collector。

背景

在传统的基于分代模型的垃圾回收算法中,小对象首先被划分到 Young Generation(新生代);如果对象经历过一定阈值的 GC 还会存活后,它就会被晋升至 Old Generation(老年代);大对象可以直接进入老年代。

对于分布式计算框架而言,这样的模型有一定的弊端:没有考虑分布式计算情景下对象的生命周期。分布式计算中控制过程(Control Path)与数据处理过程(Data Path)界限明显,如果全都用统一的 GC 模型的话,会导致频繁请求 GC 数据,扫描全堆,最后实际回收的很少,导致 Full GC (STW)。因此,像 Apache Spark 在 1.5 版本以后已经放弃使用 JVM 的 GC 进行内存管理,而是直接利用 unsafe 包进行内存管理。这样十分麻烦还容易出错。

Yak 就是为了解决这样的问题而诞生的。既然分布式计算过程中控制过程与数据处理过程界限明显,Yak 针对这两种过程中的数据划分了两种不同的空间(space):

  • Control Space (CS)
  • Data Space (DS)

对于控制过程(比如任务的调度、日志记录等),其内存布局与传统的一致,GC 还是采用分代模型,分 YoungGen/OldGen/Metaspace。控制过程产生的对象小、生命周期短暂,符合分代假设。

而对于数据处理过程,其中的对象通常都是很大的、在计算周期中一直需要访问的,因此 Yak 提出了 Epoch Region,数据对象的生命周期依赖于每个 epoch。每个 epoch 的 start 与 end 需要用户来设置(但是很简单)。

时域抽象

Epoch hypothesis: many data-path objects have the same life span and can be reclaimed together at the end of an epoch.

时域抽象(Epoch Region): 抽象成 semilattice(半格),用于描述 nested epoches 之间的偏序关系。见论文 Figure 5。(Order Theory在这里非常有用)

如何正确地回收某个特定的 Region

如果有的对象生命周期超出此 epoch,如何将其迁移至“安全地带”?

  • 思考点1:标记 escaping objects
  • 思考点2:决定 escaping objects 的迁移终点并且执行复制

对于标记的过程,可以以 cross-region/space references 为根节点来遍历对象图并且标记其中的 escaping objects(传递闭包)

对于决定其 destination 的过程,需要计算出对象 O 的引用 region 的上确界(via semilattice)。

如果对应的 region 具有继承关系,则应选择最上面的(上确界)。如果是不同线程执行的,那么对应的上确界则为 CS。

TODO: 待详细总结

Distributed System | Spark RDD 论文总结

本篇文章是对Spark RDD论文的总结,中间会穿插一些Spark的内部实现总结,对应Spark版本为2.0。

RDD

Motivation

传统的分布式计算框架(如MapReduce)在执行计算任务时,中间结果通常会存于磁盘中,这样带来的IO消耗是非常大的,尤其是对于各种机器学习算法,它们需要复用上次计算的结果进行迭代,如果每次结果都存到磁盘上再从磁盘读取,耗时会很大。因此Spark这篇论文提出了一种新的分布式数据抽象 —— RDD

设计思想及特点

Resilient Distributed Dataset(RDD)是Apache Spark中数据的核心抽象,它是一种只读的、分区的数据记录集合。

RDD的特点:

  • Lazy evaluation,只在需要的时候才进行计算
  • RDD里面的数据是分区的,每一块数据都可能分布在集群内不同的节点上;支持并行计算
  • Resilient: 借助RDD lineage graph,Spark可以重新执行之前失败的计算任务而不用整体上重新计算,保证了容错性而且非常灵活,实现了fault-tolerance

那么如何操作、处理数据呢?Spark提供了一组函数式编程风格的API,可以很方便地对RDD进行操作、变换,就像操作集合一样。比如:

1
2
3
4
5
6
7
val rdd = sc.parallelize(1 to 100)
val result = rdd.map(_ + 10)
.filter(_ > 15)
.map(x => (x, 1))
.reduceByKey(_+_)
.collect

并且开发者可以根据需要自己编写相应的RDD以及RDD之间的操作,非常方便。可以这么理解,RDD就相当于抽象的数据表示,而operation就相当于一套DSL用于对RDD进行变换或者求值。

RDD的表示

Spark中的RDD主要包含五部分信息:

  • partitions(): partition集合
  • dependencies(): 当前RDD的dependency集合
  • iterator(split, context): 对每个partition进行计算或读取操作的函数
  • partitioner(): 分区方式,如HashPartitionerRangePartitioner
  • preferredLocations(split): 访问某个partition最快的节点

所有的RDD都继承抽象类RDD。几种常见的操作:

  • sc#textFile: 生成HadoopRDD,代表可以从HDFS中读取数据的RDD
  • sc#parallelize: 生成ParallelCollectionRDD,代表从Scala集合中生成的RDD
  • map, flatMap, filter: 生成MapPartitionsRDD,其partition与parent RDD一致,同时会对parent RDD中iterator函数返回的数据进行对应的操作(lazy)
  • union: 生成UnionRDDPartitionerAwareUnionRDD
  • reduceByKey, groupByKey: 生成ShuffledRDD,需要进行shuffle操作
  • cogroup, join: 生成CoGroupedRDD

Operations

Spark里面对RDD的操作分为两种:transformationaction

  • transformation是lazy的,仅仅会保存计算步骤并返回一个新的RDD,而不会立刻执行计算操作
  • action会依次执行计算操作并且得到结果

这些transformation和action在FP中应该是很常见的,如map, flatMap, filter, reduce, count, sum

对单个数据操作的transformation函数都在RDD抽象类内,而对tuple操作的transformation都在PairRDDFunctions包装类中。RDD可以通过implicit函数在符合类型要求的时候自动转换为PairRDDFunctions类,从而可以进行reduceByKey之类的操作。对应的implicit函数:

1
2
3
4
implicit def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)])
(implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null): PairRDDFunctions[K, V] = {
new PairRDDFunctions(rdd)
}

Dependency

上面我们提到,RDD只会在需要的时候计算结果,调用那些transformation方法以后,对应的transformation信息只是被简单地存储起来,直到调用某个action才会真正地去执行计算。Spark中RDD之间是有联系的,RDD之间会形成依赖关系,也就是形成lineage graph(依赖图)。Dependency大致分两种:narrow dependency和wide dependency。

  • Narrow dependency(NarrowDependency): Parent RDD中的每个partition最多被child RDD中的一个partition使用,即一对一的关系。比如map, flatMap, filter等transformation都是narrow dependency
  • Wide dependency(ShuffleDependency):Parent RDD中的每个partition会被child RDD中的多个partition使用,即一对多的关系。比如join生成的RDD一般是wide dependency(不同的partitioner)

论文中的图例很直观地表示了RDD间的依赖关系:

Spark RDD Dependency

这样划分dependency的原因:

  1. Narrow dependency可以方便地以流水线的形式执行计算,即从头到尾一串chain下来。而wide dependency必须要等所有的parent RDD的结果都准备好以后再执行计算
  2. Narrow dependency失败以后,Spark只需要重新计算失败的parent RDD即可;而对于wide dependency来说,一失败可能导致某些分区丢失,必须整体重新进行计算

Shuffle

Spark中的shuffle操作与MapReduce中类似,在计算wide dependency对应的RDD的时候(即ShuffleMapStage)会触发。

首先来回顾一下为什么要进行shuffle操作。以reduceByKey操作为例,Spark要按照key把这些具有相同key的tuple聚集到一块然后进行计算操作。然而这些tuple可能在不同的partition中,甚至在不同的集群节点中,要想计算必须先把它们聚集起来。因此,Spark用一组map task来将每个分区写入到临时文件中,然后下一个stage端(reduce task)会根据编号获取临时文件,然后将partition中的tuple按照key聚集起来并且进行相应的操作。这里面还包括着排序操作(可能在map side也可能在reduce side进行)。

Shuffle是Spark的主要性能瓶颈之一(涉及磁盘IO,数据序列化和网络IO),其优化一直是个难题。

  • Shuffle write(map task): SortShuffleWriter#write
  • Shuffle read(reduce task): ShuffleRDD#compute

Persistence

Checkpointing

Checkpoint的目的是保存那些计算耗时较长的RDD数据(long lineage chains),执行Checkpoint的时候会新提交一个Job,因此最好先persistcheckpoint

Cache/Persist

cachepersist用于缓存一些经常使用的RDD结果(但是不能太大)。

  • persist方法的主要作用是改变StorageLevel以在compute的时候通过BlockManager进行相应的持久化操作
  • cache方法相当于设置存储级别为MEMORY_ONLY

Job Scheduling

简单来说,Spark会将提交的计算划分为不同的stages,形成一个有向无环图(DAG)。Spark的调度器会按照DAG的次序依次进行计算每个stage,最终得到计算结果。执行计算的几个重要的类或接口如下:

  • DAGScheduler
  • ActiveJob
  • Stage
  • Task
  • TaskScheduler
  • SchedulerBackend

这里面最为重要的就是 DAGScheduler 了,它会将逻辑执行计划(即RDD lineage)转化为物理执行计划(stage/task)。之前我们提到过,当开发者对某个RDD执行action的时候,Spark才会执行真正的计算过程。当开发者执行action的时候,SparkContext会将当前的逻辑执行计划传给DAGSchedulerDAGScheduler会根据给定的逻辑执行计划生成一个Job(对应ActiveJob类)并提交。每执行一个acton都会生成一个ActiveJob

提交Job的过程中,DAGScheduler会进行stage的划分。Spark里是按照shuffle操作来划分stage的,也就是说stage之间都是wide dependency,每个stage之内的dependency都是narrow dependency。这样划分的好处是尽可能地把多个narrow dependency的RDD放到同一个stage之内以便于进行pipeline计算,而wide dependency中child RDD必须等待所有的parent RDD计算完成并且shuffle以后才能接着计算,因此这样划分stage是最合适的。

划分好的stages会形成一个DAG,DAGScheduler会根据DAG中的顺序先提交parent stages(如果存在的话),再提交当前stage,以此类推,最先提交的是没有parent stage的stage。从执行角度来讲,一个stage的parent stages执行完以后,该stage才可以被执行。最后一个stage是产生最终结果的stage,对应ResultStage,而其余的stage都是ShuffleMapStage。下面是论文中stage划分的一个图例,非常直观:

DAG of stages

提交stage的时候,Spark会根据stage的类型生成一组对应类型的Task(ResultTaskShuffleMapTask),然后将这些Task包装成TaskSet提交到TaskScheduler中。一个Task对应某个RDD中的某一个partition,即一个Task只负责某个partition的计算:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
val tasks: Seq[Task[_]] = try {
stage match {
case stage: ShuffleMapStage =>
partitionsToCompute.map { id =>
val locs = taskIdToLocations(id)
val part = stage.rdd.partitions(id)
new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
taskBinary, part, locs, stage.latestInfo.taskMetrics, properties)
}
case stage: ResultStage =>
val job = stage.activeJob.get
partitionsToCompute.map { id =>
val p: Int = stage.partitions(id)
val part = stage.rdd.partitions(p)
val locs = taskIdToLocations(id)
new ResultTask(stage.id, stage.latestInfo.attemptId,
taskBinary, part, locs, id, properties, stage.latestInfo.taskMetrics)
}
}
} catch {
// 此处代码略...
}

TaskScheduler会向执行任务的后端(SchedulerBackend,可以是Local, Mesos, Hadoop YARN或者其它集群管理组件)发送ReviveOffers消息,对应的执行后端接收到消息以后会将Task封装成TaskRunner(Runnable接口的实例),然后提交到底层的Executor中,并行执行计算任务。

Executor中的线程池定义如下:

1
private val threadPool = ThreadUtils.newDaemonCachedThreadPool("Executor task launch worker")
1
2
3
4
def newDaemonCachedThreadPool(prefix: String): ThreadPoolExecutor = {
val threadFactory = namedThreadFactory(prefix)
Executors.newCachedThreadPool(threadFactory).asInstanceOf[ThreadPoolExecutor]
}

可以看到底层执行task的线程池实际上是JUC中的CachedThreadPool,按需创建新线程,同时会复用线程池中已经建好的线程。

最后用一幅图总结一下Job, Stage和Task的关系(图来自 Mastering Apache Spark 2.0):

Stage, Job and Task in Spark

整个Spark Context执行task的步骤图:

Memory Management

Spark中RDD的存储方式有两种:in memory和on disk,默认是in memory的。进行分布式计算的时候通常会读入大量的数据,并且通常还需要重用这些数据,如果简单地把内存管理交给GC的话,很容易导致回收失败从而cause full GC,影响性能。

Spark 1.5开始不再通过GC管理内存。Spark 1.5实现了一个内存管理器用于手动管理内存(Project Tungsten),底层通过Unsafe类来直接分配和回收内存。

另外,分布式计算系统的GC方面还可以参考OSDI 2016的一篇论文: Yak: A High-Performance Big-Data-Friendly Garbage Collector

PageRank实例

下面在Spark中跑一个PageRank来观察一下生成的Stage DAG。PageRank的公式比较简单:

$$PageRank (p_i) = \frac{1-d}{N} + d \sum_{p_j \in M(p_i)} \frac{PageRank (p_j)}{L(p_j)} $$

这里我们选择damping factor=0.85,初始的rank值为1.0;PageRank算法可以用马尔科夫矩阵进行优化,但是这里迭代次数较小,可以直接进行迭代计算。对应代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
val iters = 10
val data = sc.textFile("data.txt")
val links = data.map { s =>
val parts = s.split("\\s+")
(parts(0), parts(1))
}.distinct().groupByKey().cache()
var ranks = links.mapValues(v => 1.0)
for (i <- 1 to iters) {
val contribs = links.join(ranks).values.flatMap { case (urls, rank) =>
val size = urls.size
urls.map(url => (url, rank / size))
}
ranks = contribs.reduceByKey(_ + _).mapValues(0.15 + 0.85 * _)
}
val output = ranks.collect()

对应的Stage DAG:

DAG of stages in PageRank Algorithm

其中Stage 3中的RDD dependencies如下:

One stage in PageRank Algorithm


References

Vert.x 技术内幕 | 解道 Vert.x 线程模型

线程模型概述

Vert.x 的线程模型设计的非常巧妙。总的来说,Vert.x 中主要有两种线程:Event Loop 线程Worker 线程。其中,Event Loop 线程结合了 Netty 的 EventLoop,用于处理事件。每一个 EventLoop 都与唯一的线程相绑定,这个线程就叫 Event Loop 线程。Event Loop 线程不能被阻塞,否则事件将无法被处理。

Worker 线程用于执行阻塞任务,这样既可以执行阻塞任务而又不阻塞 Event Loop 线程。

如果像 Node.js 一样只有单个 Event Loop 的话就不能充分利用多核 CPU 的性能了。为了充分利用多核 CPU 的性能,Vert.x 中提供了一组 Event Loop 线程。每个 Event Loop 线程都可以处理事件。为了保证线程安全,防止资源争用,Vert.x 保证了某一个 Handler 总是被同一个 Event Loop 线程执行,这样不仅可以保证线程安全,而且还可以在底层对锁进行优化提升性能。所以,只要开发者遵循 Vert.x 的线程模型,开发者就不需要再担心线程安全的问题,这是非常方便的。

本篇文章将底层的角度来解析 Vert.x 的线程模型。对应的 Vert.x 版本为 3.3.3

Event Loop 线程

首先回顾一下 Event Loop 线程,它会不断地轮询获取事件,并将获取到的事件分发到对应的事件处理器中进行处理:

Vert.x Event Loop

Vert.x 线程模型中最重要的一点就是:永远不要阻塞 Event Loop 线程。因为一旦处理事件的线程被阻塞了,事件就会一直积压着不能被处理,整个应用也就不能正常工作了。

Vert.x 中内置一种用于检测 Event Loop 是否阻塞的线程:vertx-blocked-thread-checker。一旦 Event Loop 处理某个事件的时间超过一定阈值(默认为 2000 ms)就会警告,如果阻塞的时间过长就会抛出异常。Block Checker 的实现原理比较简单,底层借助了 JUC 的 TimerTask,定时计算每个 Event Loop 线程的处理事件消耗的时间,如果超时就进行相应的警告。

Vert.x Thread

Vert.x 中的 Event Loop 线程及 Worker 线程都用 VertxThread 类表示,并通过 VertxThreadFactory 线程工厂来创建。VertxThreadFactory 创建 Vert.x 线程的过程非常简单:

1
2
3
4
5
6
7
8
9
10
11
12
@Override
public Thread newThread(Runnable runnable) {
VertxThread t = new VertxThread(runnable, prefix + threadCount.getAndIncrement(), worker, maxExecTime);
if (checker != null) {
checker.registerThread(t);
}
addToMap(t);
t.setDaemon(false);
return t;
}

除了创建 VertxThread 线程之外,VertxThreadFactory 还会将此线程注册至 Block Checker 线程中以监视线程的阻塞情况,并且将此线程添加至内部的 weakMap 中。这个 weakMap 作用只有一个,就是在注销对应的 Verticle 的时候可以将每个 VertxThread 中的 Context 实例清除(unset)。为了保证资源不被一直占用,这里使用了 WeakHashMap 来存储每一个 VertxThread。当里面的 VertxThread 的引用不被其他实例持有的时候,它就会被标记为可清除的对象,等待 GC。

至于 VertxThread,它其实就是在普通线程的基础上存储了额外的数据(如对应的 Vert.x Context,最大执行时长,当前执行时间,是否为 Worker 线程等),这里就不多讲了。

Vert.x Context

Vert.x 底层中一个重要的概念就是 Context,每个 Context 都会绑定着一个 Event Loop 线程(而一个 Event Loop 线程可以对应多个 Context)。我们可以把 Context 看作是控制一系列的 Handler 的执行作用域及顺序的上下文对象。

每当 Vert.x 底层将事件分发至 Handler 的时候,Vert.x 都会给此 Handler 绑定一个 Context 用于处理任务:

  • 如果当前线程是 Vert.x 线程(VertxThread),那么 Vert.x 就会复用此线程上绑定的 Context;如果没有对应的 Context 就创建新的
  • 如果当前线程是普通线程,就创建新的 Context

Vert.x 中存在三种 Context,与之前的线程种类相对应:

  • EventLoopContext
  • WorkerContext
  • MultiThreadedWorkerContext

Event Loop Context

每个 Event Loop Context 都会对应着唯一的一个 EventLoop,即一个 Event Loop Context 只会在同一个 Event Loop 线程上执行任务。在创建 Context 的时候,Vert.x 会自动根据轮询策略选择对应的 EventLoop:

1
2
3
4
5
6
7
8
9
10
11
protected ContextImpl(VertxInternal vertx, WorkerPool internalBlockingPool, WorkerPool workerPool, String deploymentID, JsonObject config,
ClassLoader tccl) {
// ...
EventLoopGroup group = vertx.getEventLoopGroup();
if (group != null) {
this.eventLoop = group.next();
} else {
this.eventLoop = null;
}
// ...
}

在 Netty 中,EventLoopGroup 代表一组 EventLoop,而从中获取 EventLoop 的方法则是 next 方法。EventLoopGroupEventLoop 的数量由 CPU 核数所确定。Vert.x 这里使用了 Netty NIO 对应的 NioEventLoop

1
2
eventLoopGroup = new NioEventLoopGroup(options.getEventLoopPoolSize(), eventLoopThreadFactory);
eventLoopGroup.setIoRatio(NETTY_IO_RATIO);

对应的轮询算法:

1
2
3
4
5
6
7
8
@Override
public EventExecutorChooser newChooser(EventExecutor[] executors) {
if (isPowerOfTwo(executors.length)) {
return new PowerOfTowEventExecutorChooser(executors);
} else {
return new GenericEventExecutorChooser(executors);
}
}

可以看到,正常情况下 Netty 会用轮询策略选择 EventLoop。特别地,如果 EventLoop 的个数是 2 的倍数的话,选择的会快一些:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private static final class GenericEventExecutorChooser implements EventExecutorChooser {
// ...
@Override
public EventExecutor next() {
return executors[Math.abs(idx.getAndIncrement() % executors.length)];
}
}
private static final class PowerOfTowEventExecutorChooser implements EventExecutorChooser {
// ...
@Override
public EventExecutor next() {
return executors[idx.getAndIncrement() & executors.length - 1];
}
}

我们可以在 Embedded 模式下测试一下 Event Loop 线程的分配:

1
2
3
4
5
6
7
System.out.println(Thread.currentThread());
Vertx vertx = Vertx.vertx();
for (int i = 0; i < 20; i++) {
int index = i;
vertx.setTimer(1, t -> {
System.out.println(index + ":" + Thread.currentThread());
});

运行结果(不同机器运行顺序、Event Loop线程数可能不同):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
Thread[main,5,main]
0:Thread[vert.x-eventloop-thread-0,5,main]
1:Thread[vert.x-eventloop-thread-1,5,main]
2:Thread[vert.x-eventloop-thread-2,5,main]
3:Thread[vert.x-eventloop-thread-3,5,main]
5:Thread[vert.x-eventloop-thread-5,5,main]
6:Thread[vert.x-eventloop-thread-6,5,main]
8:Thread[vert.x-eventloop-thread-8,5,main]
7:Thread[vert.x-eventloop-thread-7,5,main]
10:Thread[vert.x-eventloop-thread-10,5,main]
9:Thread[vert.x-eventloop-thread-9,5,main]
4:Thread[vert.x-eventloop-thread-4,5,main]
11:Thread[vert.x-eventloop-thread-11,5,main]
12:Thread[vert.x-eventloop-thread-12,5,main]
13:Thread[vert.x-eventloop-thread-13,5,main]
14:Thread[vert.x-eventloop-thread-14,5,main]
16:Thread[vert.x-eventloop-thread-0,5,main]
17:Thread[vert.x-eventloop-thread-1,5,main]
15:Thread[vert.x-eventloop-thread-15,5,main]
18:Thread[vert.x-eventloop-thread-2,5,main]
19:Thread[vert.x-eventloop-thread-3,5,main]

可以看到尽管每个 Context 对应唯一的 Event Loop 线程,而每个 Event Loop 线程却可能对应多个 Context

Event Loop Context 会在对应的 EventLoop 中执行 Handler 进行事件的处理(IO 事件,非阻塞)。Vert.x 会保证同一个 Handler 会一直在同一个 Event Loop 线程中执行,这样可以简化线程模型,让开发者在写 Handler 的时候不需要考虑并发的问题,非常方便。

我们来看一下 Handler 是如何在 EventLoop 上执行的。EventLoopContext 中实现了 executeAsync 方法用于包装 Handler 中事件处理的逻辑并将其提交至对应的 EventLoop 中进行执行:

1
2
3
4
public void executeAsync(Handler<Void> task) {
// No metrics, we are on the event loop.
nettyEventLoop().execute(wrapTask(null, task, true, null));
}

这里 Vert.x 使用了 wrapTask 方法将 Handler 封装成了一个 Runnable 用于向 EventLoop 中提交。代码比较直观,大致就是检查当前线程是否为 Vert.x 线程,然后记录事件处理开始的时间,给当前的 Vert.x 线程设置 Context,并且调用 Handler 里面的事件处理方法。具体请参考源码,这里就不贴出来了。

那么把封装好的 task 提交到 EventLoop 以后,EventLoop 是怎么处理的呢?这就需要更多的 Netty 相关的知识了。根据Netty 的模型,Event Loop 线程需要处理 IO 事件,普通事件(即我们的 Handler)以及定时事件(比如 Vert.x 的 setTimer)。Vert.x 会提供一个 NETTY_IO_RATIO 给Netty代表 EventLoop 处理 IO 事件时间占用的百分比(默认为 50,即 IO事件时间占用:非IO事件时间占用 = 1:1)。当 EventLoop 启动的时候,它会不断轮询 IO 事件及其它事件并进行处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
@Override
protected void run() {
for (;;) {
try {
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
default:
// fallthrough
}
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
processSelectedKeys();
runAllTasks();
} else {
final long ioStartTime = System.nanoTime();
processSelectedKeys();
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
break;
}
}
} catch (Throwable t) {
// process the error
// ...
}
}
}

这里面 Netty 会调用 processSelectedKeys 方法进行 IO 事件的处理,并且会计算出处理 IO 时间所用的事件然后计算出给非 IO 事件处理分配的时间,然后调用 runAllTasks 方法执行所有的非 IO 任务(这里面就有我们的各个 Handler)。

runAllTasks 会按顺序从内部的任务队列中取出任务(Runnable)然后进行安全执行。而我们刚才调用的 NioEventLoopexecute 方法其实就是将包装好的 Handler 置入 NioEventLoop 内部的任务队列中等待执行。

Worker Context

顾名思义,Worker Context 用于跑阻塞任务。与 Event Loop Context 相似,每一个 Handler 都只会跑在固定的 Worker 线程下。

Vert.x 还提供一种 Multi-threaded worker context 可以在多个 Worker 线程下并发执行任务,这样就会出现并发问题,需要开发者自行解决并发问题。因此一般情况下我们用不到 Multi-threaded worker context。

Verticle

我们再来讨论一下 Verticle 中的 Context。在部署 Verticle 的时候,Vert.x 会根据配置来创建 Context 并绑定到 Verticle 上,此后此 Verticle 上所有绑定的 Handler 都会在此 Context 上执行。相关实现位于 doDeploy 方法,这里摘取核心部分:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
for (Verticle verticle: verticles) {
WorkerExecutorImpl workerExec = poolName != null ? vertx.createSharedWorkerExecutor(poolName, options.getWorkerPoolSize()) : null;
WorkerPool pool = workerExec != null ? workerExec.getPool() : null;
// 根据配置创建Context
ContextImpl context = options.isWorker() ? vertx.createWorkerContext(options.isMultiThreaded(), deploymentID, pool, conf, tccl) :
vertx.createEventLoopContext(deploymentID, pool, conf, tccl);
if (workerExec != null) {
context.addCloseHook(workerExec);
}
context.setDeployment(deployment);
deployment.addVerticle(new VerticleHolder(verticle, context));
// 此Verticle上的Handler都会在创建的context作用域内执行
context.runOnContext(v -> {
try {
verticle.init(vertx, context);
Future<Void> startFuture = Future.future();
// 大家熟悉的start方法的执行点
verticle.start(startFuture);
startFuture.setHandler(ar -> {
if (ar.succeeded()) {
if (parent != null) {
parent.addChild(deployment);
deployment.child = true;
}
vertx.metricsSPI().verticleDeployed(verticle);
deployments.put(deploymentID, deployment);
if (deployCount.incrementAndGet() == verticles.length) {
reportSuccess(deploymentID, callingContext, completionHandler);
}
} else if (!failureReported.get()) {
reportFailure(ar.cause(), callingContext, completionHandler);
}
});
} catch (Throwable t) {
reportFailure(t, callingContext, completionHandler);
}
});
}

通过这样一种方式,Vert.x保证了Verticle的线程安全 —— 即某个Verticle上的所有Handler都会在同一个Vert.x线程上执行,这样也保证了Verticle内部成员的安全(没有race condition问题)。比如下面Verticle中处理IO及事件的处理都一直是在同一个Vert.x线程下执行的,每次打印出的线程名称应该是一样的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class TcpClientVerticle extends AbstractVerticle {
int i = 0;
@Override
public void start() throws Exception {
vertx.createNetClient().connect(6666, "localhost", ar -> {
if (ar.succeeded()) {
NetSocket socket = ar.result();
System.out.println(Thread.currentThread().getName());
socket.handler(buffer -> {
i++;
System.out.println(Thread.currentThread().getName());
System.out.println("Net client receiving: " + buffer.toString("UTF-8"));
});
socket.write("+1s\n");
} else {
ar.cause().printStackTrace();
}
});
}
}

线程池

Event Loop 线程池

之前我们已经提到过,Event Loop 线程池的类型为 Netty 中的NioEventLoopGroup,里面的线程通过 Vert.x 自己的线程工厂VertxThreadFactory进行创建:

1
2
3
eventLoopThreadFactory = new VertxThreadFactory("vert.x-eventloop-thread-", checker, false, options.getMaxEventLoopExecuteTime());
eventLoopGroup = new NioEventLoopGroup(options.getEventLoopPoolSize(), eventLoopThreadFactory);
eventLoopGroup.setIoRatio(NETTY_IO_RATIO);

其中 Event Loop 线程的数目可以在配置中指定。

Worker 线程池

在之前讲 executeBlocking 底层实现的文章中我们已经提到过 Worker 线程池,它其实就是一种 Fixed Thread Pool:

1
2
3
4
5
ExecutorService workerExec = Executors.newFixedThreadPool(options.getWorkerPoolSize(),
new VertxThreadFactory("vert.x-worker-thread-", checker, true, options.getMaxWorkerExecuteTime()));
PoolMetrics workerPoolMetrics = isMetricsEnabled() ? metrics.createMetrics(workerExec, "worker", "vert.x-worker-thread", options.getWorkerPoolSize()) : null;
workerPool = new WorkerPool(workerExec, workerPoolMetrics);

Worker线程同样由VertxThreadFactory构造,类型为VertxThread,用于执行阻塞任务。我们同样可以在配置中指定其数目。

内部阻塞线程池

1
2
3
4
ExecutorService internalBlockingExec = Executors.newFixedThreadPool(options.getInternalBlockingPoolSize(),
new VertxThreadFactory("vert.x-internal-blocking-", checker, true, options.getMaxWorkerExecuteTime()));
PoolMetrics internalBlockingPoolMetrics = isMetricsEnabled() ? metrics.createMetrics(internalBlockingExec, "worker", "vert.x-internal-blocking", options.getInternalBlockingPoolSize()) : null;
internalBlockingPool = new WorkerPool(internalBlockingExec, internalBlockingPoolMetrics);

Internal Blocking Pool可能设计用于内部使用,在executeBlocking(Action<T> action, Handler<AsyncResult<T>> resultHandler)这个版本的方法中就使用了它。

Acceptor Event Loop 线程池

大家可能会发现VertxImpl类中还有一个acceptorEventLoopGroup。顾名思义,它是Netty中的Acceptor线程池,负责处理客户端的连接请求:

1
2
acceptorEventLoopGroup = new NioEventLoopGroup(1, acceptorEventLoopThreadFactory);
acceptorEventLoopGroup.setIoRatio(100);

由于系统只有一个服务端端口需要监听,因此这里只需要一个线程。

Vert.x中的HttpServer就利用了acceptorEventLoopGroup处理客户端的连接请求,具体的实现后边会另起一篇介绍。

Vert.x 技术内幕 | executeBlocking 实现原理

引入

大家都知道,Vert.x中的executeBlocking方法用于执行阻塞任务,并且有两种模式:有序执行和无序执行。下面我们来看两段代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
vertx.setPeriodic(1000, t -> {
vertx.executeBlocking(future -> {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName());
future.complete();
}, r -> {});
});
vertx.setPeriodic(1000, t -> {
vertx.executeBlocking(future -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName());
future.complete();
}, r -> {});
});

我们思考一下,每段代码每次执行的时候使用的线程相同么?正常情况下大家都知道executeBlocking底层使用了Worker线程池,因此貌似两种情况没什么区别,都是轮询Worker线程池,每次可能用不同的Worker线程。但是我们测一下:

第一段代码:

1
2
3
4
5
vert.x-worker-thread-0
vert.x-worker-thread-1
vert.x-worker-thread-2
vert.x-worker-thread-3
vert.x-worker-thread-4

第二段代码:

1
2
3
4
5
vert.x-worker-thread-0
vert.x-worker-thread-0
vert.x-worker-thread-0
vert.x-worker-thread-0
vert.x-worker-thread-0

额。。。两段代码每次执行的线程居然有差异?第二次为什么每次都用相同的Worker线程?其实,大家可能忽略了一点:executeBlocking方法默认顺序执行提交的阻塞任务。今天我们就来探究一下executeBlocking内部的实现。

Worker线程池

我们来回顾一下Vert.x底层的Worker线程池,它在创建VertxImpl实例的时候进行初始化:

1
2
3
4
5
ExecutorService workerExec = Executors.newFixedThreadPool(options.getWorkerPoolSize(),
new VertxThreadFactory("vert.x-worker-thread-", checker, true, options.getMaxWorkerExecuteTime()));
PoolMetrics workerPoolMetrics = isMetricsEnabled() ? metrics.createMetrics(workerExec, "worker", "vert.x-worker-thread", options.getWorkerPoolSize()) : null;
workerPool = new WorkerPool(workerExec, workerPoolMetrics);

可以看到底层的Worker线程池本质上是一种FixedThreadPool,里面的线程由VertxThreadFactory控制生成,对应的线程类型为VertxThread。Vert.x内部用WorkerPool类对线程池以及线程池相关的Metrics类进行了封装。

阻塞任务在哪里执行?

有了Worker线程池的基础,我们来看一下Vertx实例中的executeBlocking方法,它的过程很简单:获取当前的Vert.x Context(没有就创建),然后委托调用Context里的executeBlocking方法:

1
2
3
4
5
6
7
8
9
10
11
12
@Override
public <T> void executeBlocking(Handler<Future<T>> blockingCodeHandler, boolean ordered,
Handler<AsyncResult<T>> asyncResultHandler) {
ContextImpl context = getOrCreateContext();
context.executeBlocking(blockingCodeHandler, ordered, asyncResultHandler);
}
@Override
public <T> void executeBlocking(Handler<Future<T>> blockingCodeHandler,
Handler<AsyncResult<T>> asyncResultHandler) {
executeBlocking(blockingCodeHandler, true, asyncResultHandler);
}

在此方法中可以看到,ordered标志位默认为true,即默认按提交的次序执行阻塞任务。

我们再来看一下ContextImpl类中的executeBlocking方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
@Override
public <T> void executeBlocking(Handler<Future<T>> blockingCodeHandler, boolean ordered, Handler<AsyncResult<T>> resultHandler) {
executeBlocking(null, blockingCodeHandler, resultHandler, ordered ? workerExec : workerPool.executor(), workerPool.metrics());
}
<T> void executeBlocking(Action<T> action, Handler<Future<T>> blockingCodeHandler,
Handler<AsyncResult<T>> resultHandler,
Executor exec, PoolMetrics metrics) {
Object queueMetric = metrics != null ? metrics.submitted() : null;
try {
exec.execute(() -> {
Object execMetric = null;
if (metrics != null) {
execMetric = metrics.begin(queueMetric);
}
Future<T> res = Future.future();
try {
if (blockingCodeHandler != null) {
ContextImpl.setContext(this);
blockingCodeHandler.handle(res);
} else {
T result = action.perform();
res.complete(result);
}
} catch (Throwable e) {
res.fail(e);
}
if (metrics != null) {
metrics.end(execMetric, res.succeeded());
}
if (resultHandler != null) {
runOnContext(v -> res.setHandler(resultHandler));
}
});
} catch (RejectedExecutionException ignore) {
// Pool is already shut down
if (metrics != null) {
metrics.rejected(queueMetric);
}
}
}

它调用了另一个具体版本的executeBlocking方法,其中第四个参数即为要执行阻塞任务的线程池。如果要有序执行(ordered为true),底层就使用context实例里的workerExec线程池;如果无序执行,就调用workerPoolexecutor方法获取另一种线程池。

看到这里,我们大致已经想到了,有序执行和无序执行两种模式使用不同的线程池,因此底层实现肯定有差异。我们来看一下前面提到的两个线程池,它们都是ContextImpl类的成员变量:

1
2
protected final WorkerPool workerPool;
protected final Executor workerExec;

在通过Vertx实例创建Context的时候,这几个变量会被初始化,其来源就是之前我们看过的VertxImpl实例中的Worker线程池。看一下ContextImpl类的构造函数就一目了然了:

1
2
this.workerPool = workerPool;
this.workerExec = workerPool.createOrderedExecutor();

嗯。。。有序执行对应的线程池通过workerPoolcreateOrderedExecutor方法获得,而无序执行对应的线程池通过workerPoolexecutor方法获得。因此,WorkerPool类是一个关键点,我们稍后就看一下其实现。

注意Vert.x规定,blockingCodeHandler中的逻辑(即阻塞任务)在Worker线程内执行,而resultHandler内的逻辑(结果处理)需要在Vert.x Conext中执行,因此前面需要预先设置当前使用的Worker线程的Contextthis以便后面调用runOnContext方法执行结果处理逻辑。

下面就来看一下有序执行和无序执行这两种线程池的具体区别。

无序执行

我们看一下WorkerPool类的源码中获取无序执行线程池的逻辑:

1
2
3
ExecutorService executor() {
return pool;
}

可以看到executor方法直接返回了内部的pool线程池,而pool线程池其实就是VertxImpl中的workerExec线程池:

1
workerPool = new WorkerPool(workerExec, workerPoolMetrics);

OK!如果大家熟悉并发的话,大家应该对无序执行对应的线程池 —— Worker线程池的行为非常清楚了。它属于一种FixedThreadPool,底层通过阻塞队列LinkedBlockingQueue实现。底层通过轮询算法获取Worker线程执行任务。

有序执行

下面是时候看有序执行对应的逻辑了:

1
2
3
4
5
6
7
8
9
10
11
12
13
private final OrderedExecutorFactory orderedFact;
private final ExecutorService pool;
private final PoolMetrics metrics;
public WorkerPool(ExecutorService pool, PoolMetrics metrics) {
this.orderedFact = new OrderedExecutorFactory(pool);
this.pool = pool;
this.metrics = metrics;
}
Executor createOrderedExecutor() {
return orderedFact.getExecutor();
}

可以看到有序执行对应的线程池是通过OrderedExecutorFactory创建的。其实,OrderedExecutorFactory类会生成真正的有序执行线程池OrderedExecutor,它其实是对Worker线程池pool的一个简单包装,仅仅添加了有序执行相关的逻辑,最后还是委托Worker线程池进行任务处理。

那么OrderedExecutor是如何实现顺序执行的呢?OrderedExecutor内部维护着一个任务队列。每当调用executeBlocking方法执行阻塞过程的时候,Vert.x会将阻塞过程包装成Runnable然后置入OrderedExecutor中的任务队列中;同时如果OrderedExecutor没有开始执行任务,就委托内部的Worker线程池执行任务:

1
2
3
4
5
6
7
8
9
public void execute(Runnable command) {
synchronized (tasks) {
tasks.add(command);
if (!running) {
running = true;
parent.execute(runner);
}
}
}

从代码中可以看出,最后委托Worker线程池执行的线程其实是又包装了一层的runner线程。runner的逻辑不难想:不断地从任务队列中取出队首的Runnable然后调用其run方法执行(相当于执行了此任务,只不过在runner对应的线程中);如果没有任务了就结束本线程。

这里就出现了一种情况:大批量提交阻塞任务的时候,线程池的状态running一直为true,此时所有的任务都积压到任务队列中,而执行所有任务的线程只有一个 —— runner对应的线程。这种情况其实很好想,因为要保证有序执行,就只能让它一个接一个地在同个线程中执行。如果在不同线程中依次执行则不好调度,如果直接并行执行则不能保证有序性。

所以,根据OrderedExecutor线程池的内部实现,只要提交任务的间隔时间小于任务执行的时间,底层其实就仅执行了一次runner,也就是说所有提交的阻塞任务都只在一个线程下跑(running标志位控制)。

这样就可以很好地解释我们一开始提出的问题了。当sleep(200), setPeriodic(1000)的时候,提交任务的间隔时间大于任务执行的时间,这样每次的runner就可以在下一个任务提交之前执行完,因此每次所用的线程会不同(轮询策略);而sleep(2000), setPeriodic(1000)的时候,提交任务的间隔时间小于任务执行的时间,底层最后都归结到一个runner中执行了,因此所有过程都是在同一个Worker线程执行的(很好想,保证有序就要串行执行)。

当然,如果不想有序执行,可以用void executeBlocking(Handler<Future<T>> blockingCodeHandler, boolean ordered, Handler<AsyncResult<T>> asyncResultHandler)这个版本的executeBlocking方法,并将ordered标志位设为false。根据上面的源码,底层会直接使用Worker线程池而不是OrderedExecutor线程池,这样就不会有上面OrderedExecutor的情况了。

Vert.x 技术内幕 | 异步 RPC 实现原理

经常有一些开发者在 group 中问到,如何利用 Vert.x 进行 RPC 通信。其实,Vert.x 提供了一个组件 —— Vert.x Service Proxy,专门用于进行异步 RPC 通信(通过 Event Bus)。Vert.x Service Proxy 会自动生成代理类进行消息的包装与解码、发送与接收以及超时处理,可以为我们省掉不少代码。之前我在 Vert.x Blueprint 中已经详细讲解了 Vert.x Service Proxy 的使用,大家可以参考 Vert.x Kue 文档 中的相关部分。本篇文章中我们将探索一下通过 Vert.x Service Proxy 生成的代理类进行异步RPC的原理,对应的 Vert.x 版本为 3.3.2

传统的 RPC 想必大家都不陌生,但是传统的 RPC 有个缺陷:传统的 RPC 都是阻塞型的,当调用者远程调用服务时需要阻塞着等待调用结果,这与 Vert.x 的异步开发模式相违背;而且,传统的 RPC 未对容错而设计。

因此,Vert.x 提供了 Service Proxy 用于进行异步 RPC,其底层依托 Clustered Event Bus 进行通信。我们只需要按照规范编写我们的服务接口(一般称为 Event Bus 服务),并加上 @ProxyGen 注解,Vert.x 就会自动为我们生成相应的代理类在底层处理 RPC。有了 Service Proxy,我们只需给异步方法提供一个回调函数 Handler<AsyncResult<T>>,在调用结果发送过来的时候会自动调用绑定的回调函数进行相关的处理,这样就与 Vert.x 的异步开发模式相符了。由于 AsyncResult 本身就是为容错而设计的(两个状态),因此这里的 RPC 也具有了容错性。

原理简介

假设有一个 Event Bus 服务接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@ProxyGen
@VertxGen
public interface SomeService {
String SERVICE_ADDRESS = "service.example";
static SomeService createService(Vertx vertx, JsonObject config) {
return new SomeServiceImpl(vertx, config);
}
static SomeService createProxy(Vertx vertx) {
return ProxyHelper.createProxy(SomeService.class, vertx, SERVICE_ADDRESS);
}
@Fluent
SomeService process(String id, Handler<AsyncResult<JsonObject>> resultHandler);
}

这里定义了一个异步方法 process,其异步调用返回的结果是 AsyncResult<JsonObject> 类型的。由于异步 RPC 底层通过 Clustered Event Bus 进行通信,我们需要给器指定一个通信地址 SERVICE_ADDRESS@Fluent 注解代表此方法返回自身,便于进行组合。我们同时还提供了两个辅助方法:createService 方法用于创建服务实例,而 createProxy 方法则通过 ProxyHelper 辅助类创建服务代理实例。

假设服务提供端A注册了一个 SomeService 类型的服务代理,服务调用端B需要通过异步RPC调用服务的 process 方法,此时调用端 B 可以利用 ProxyHelper 获取服务实例并进行服务调用。B 中获取的服务其实是一个 服务代理类,而真正的服务实例在 A 处。何为服务代理?服务代理可以帮助我们向服务提供端发送调用请求,并且响应调用结果。那么如何发送调用请求呢?相信大家能想到,是调用端B将调用参数和方法名称等必要信息包装成集群消息(ClusteredMessage),然后通过 send 方法将请求通过 Clustered Event Bus 发送至服务提供端A处(需要提供此服务的通信地址)。A 在注册服务的时候会创建一个 MessageConsumer 监听此服务的地址来响应调用请求。当接收到调用请求的时候,A 会在本地调用方法,并将结果回复至调用端。所以异步RPC本质上其实是一个基于 代理模式Request/Response 消息模式。

用时序图来描述一下上述过程:

Sequence Diagram of Async RPC

引入

以之前的 SomeService 接口为例,我们可以在集群中的一个节点上注册服务实例:

1
2
SomeService service = SomeService.createService(vertx, config);
ProxyHelper.registerService(SomeService.class, vertx, service, SomeService.SERVICE_ADDRESS);

然后在另一个节点上获取此服务实例的代理,并进行服务调用。调用的时候看起来就像在本地调用(LPC)一样,其实是进行了 RPC 通信:

1
2
3
4
5
6
SomeService proxyService = SomeService.createProxy(vertx);
// invoke the service
proxyService.process("fuck", ar -> {
// process the result...
});

其实,这里获取到的 proxyService 实例的真正类型是 Vert.x 自动生成的服务代理类 SomeServiceVertxEBProxy 类,里面封装了通过 Event Bus 进行通信的逻辑。我们首先来讲一下 Service Proxy 生成代理类的命名规范。

代理类命名规范

Vert.x Service Proxy 在生成代理类时遵循一定的规范。假设有一 Event Bus 服务接口SomeService,Vert.x 会自动为其生成代理类以及代理处理器:

  • 代理类的命名规范为 接口名 + VertxEBProxy。比如SomeService接口对应的代理类名称为SomeServiceVertxEBProxy
  • 代理类会继承原始的服务接口并实现所有方法的代理逻辑
  • 代理处理器的命名规范为 接口名 + VertxProxyHandler。比如SomeService接口对应的代理处理器名称为SomeServiceVertxProxyHandler
  • 代理处理器会继承ProxyHandler抽象类

ProxyHelper辅助类中注册服务以及创建代理都是遵循了这个规范。

在Event Bus上注册服务

我们通过ProxyHelper辅助类中的registerService方法来向Event Bus上注册Event Bus服务,来看其具体实现:

1
2
3
4
5
6
7
8
9
10
public static <T> MessageConsumer<JsonObject> registerService(Class<T> clazz, Vertx vertx, T service, String address,
boolean topLevel,
long timeoutSeconds) {
String handlerClassName = clazz.getName() + "VertxProxyHandler";
Class<?> handlerClass = loadClass(handlerClassName, clazz);
Constructor constructor = getConstructor(handlerClass, Vertx.class, clazz, boolean.class, long.class);
Object instance = createInstance(constructor, vertx, service, topLevel, timeoutSeconds);
ProxyHandler handler = (ProxyHandler) instance;
return handler.registerHandler(address);
}

首先根据约定生成对应的代理Handler的名称,然后通过类加载器加载对应的Handler类,再通过反射来创建代理Handler的实例,最后调用handlerregisterHandler方法注册服务地址。

registerHandler方法的实现在Vert.x生成的各个代理处理器中。以之前的SomeService为例,我们来看一下其对应的代理处理器SomeServiceVertxProxyHandler实现。首先是注册并订阅地址的registerHandler方法:

1
2
3
4
5
public MessageConsumer<JsonObject> registerHandler(String address) {
MessageConsumer<JsonObject> consumer = vertx.eventBus().<JsonObject>consumer(address).handler(this);
this.setConsumer(consumer);
return consumer;
}

registerHandler方法的实现非常简单,就是通过consumer方法在address地址上绑定了SomeServiceVertxProxyHandler自身。那么SomeServiceVertxProxyHandler是如何处理来自服务调用端的服务调用请求,并将调用结果返回到请求端呢?在回答这个问题之前,我们先来看看代理端(调用端)是如何发送服务调用请求的,这就要看对应的服务代理类的实现了。

服务调用

我们来看一下服务调用端是如何发出服务调用请求的消息的。之前已经介绍过,服务调用端是通过Event Bus的send方法发送调用请求的,并且会提供一个replyHandler来等待方法调用的结果。调用的方法名称会存放在消息中名为action的header中。以之前SomeService的代理类SomeServiceVertxEBProxyprocess方法的请求为例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public SomeService process(String id, Handler<AsyncResult<JsonObject>> resultHandler) {
if (closed) {
resultHandler.handle(Future.failedFuture(new IllegalStateException("Proxy is closed")));
return this;
}
JsonObject _json = new JsonObject();
_json.put("id", id);
DeliveryOptions _deliveryOptions = (_options != null) ? new DeliveryOptions(_options) : new DeliveryOptions();
_deliveryOptions.addHeader("action", "process");
_vertx.eventBus().<JsonObject>send(_address, _json, _deliveryOptions, res -> {
if (res.failed()) {
resultHandler.handle(Future.failedFuture(res.cause()));
} else {
resultHandler.handle(Future.succeededFuture(res.result().body()));
}
});
return this;
}

可以看到代理类把此方法传入的参数都放到一个JsonObject中了,并将要调用的方法名称存放在消息中名为action的header中。代理方法通过send方法将包装好的消息发送至之前注册的服务地址处,并且绑定replyHandler等待调用结果,然后使用我们传入到process方法中的resultHandler对结果进行处理。是不是很简单呢?

服务提供端的调用逻辑

调用请求发出之后,我们的服务提供端就会收到调用请求消息,然后执行SomeServiceVertxProxyHandler中的处理逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public void handle(Message<JsonObject> msg) {
try {
JsonObject json = msg.body();
String action = msg.headers().get("action");
if (action == null) {
throw new IllegalStateException("action not specified");
}
accessed();
switch (action) {
case "process": {
service.process((java.lang.String)json.getValue("id"), createHandler(msg));
break;
}
default: {
throw new IllegalStateException("Invalid action: " + action);
}
}
} catch (Throwable t) {
msg.reply(new ServiceException(500, t.getMessage()));
throw t;
}
}

handle方法首先从消息header中获取方法名称,如果获取不到则调用失败;接着handle方法会调用accessed方法记录最后调用服务的时间戳,这是为了实现超时的逻辑,后面我们会讲。接着handle方法会根据方法名称分派对应的逻辑,在“真正”的服务实例上调用方法。注意异步RPC的过程本质是 Request/Response 模式,因此这里的异步结果处理函数resultHandler应该将调用结果发送回调用端。此resultHandler是通过createHandler方法生成的,逻辑很清晰:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private <T> Handler<AsyncResult<T>> createHandler(Message msg) {
return res -> {
if (res.failed()) {
if (res.cause() instanceof ServiceException) {
msg.reply(res.cause());
} else {
msg.reply(new ServiceException(-1, res.cause().getMessage()));
}
} else {
if (res.result() != null && res.result().getClass().isEnum()) {
msg.reply(((Enum) res.result()).name());
} else {
msg.reply(res.result());
}
}
};
}

这样,一旦在服务提供端的调用过程完成时,调用结果就会被发送回调用端。这样调用端就可以调用结果执行真正的处理逻辑了。

超时处理

Vert.x 自动生成的代理处理器内都封装了一个简单的超时处理逻辑,它是通过定时器定时检查最后的调用时间实现的。逻辑比较简单,直接放上相关逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public SomeServiceVertxProxyHandler(Vertx vertx, SomeService service, boolean topLevel, long timeoutSeconds) {
// 前面代码略。。。
if (timeoutSeconds != -1 && !topLevel) {
long period = timeoutSeconds * 1000 / 2;
if (period > 10000) {
period = 10000;
}
this.timerID = vertx.setPeriodic(period, this::checkTimedOut);
} else {
this.timerID = -1;
}
accessed();
}
private void checkTimedOut(long id) {
long now = System.nanoTime();
if (now - lastAccessed > timeoutSeconds * 1000000000) {
close();
}
}

一旦超时,就自动调用close方法终止定时器,注销响应服务调用请求的consumer并关闭代理。

代码是如何生成的?

大家可能会很好奇,这些服务代理类是怎么生成出来的?其实,这都是 Vert.x Codegen 的功劳。Vert.x Codegen 的本质是一个 注解处理器(APT),它可以扫描源码中是否包含要处理的注解,检查规范后根据响应的模板生成对应的代码,这就是注解处理器的作用(注解处理器于 JDK 1.6 引入)。为了让 Codegen 正确地生成代码,我们需要配置编译参数来确保注解处理器能够正常的工作,具体的可以参考 Vert.x Codegen 的文档 (之前里面缺了 Gradle 相关的实例,我给补上了)。

Vert.x Codegen 使用 MVEL2 作为生成代码的模板,扩展名为 *.templ,比如代理类和代理处理器的模板就位于 vert-x3/vertx-service-proxy 中,配置文件类似于这样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
{
"name": "Proxy",
"generators": [
{
"kind": "proxy",
"fileName": "ifaceFQCN + 'VertxEBProxy.java'",
"templateFileName": "serviceproxy/template/proxygen.templ"
},{
"kind": "proxy",
"fileName": "ifaceFQCN + 'VertxProxyHandler.java'",
"templateFileName": "serviceproxy/template/handlergen.templ"
}
]
}

具体的代码生成逻辑还要涉及 APT 及 MVEL2 的知识,这里就不展开讲了,有兴趣的朋友可以研究研究 Vert.x Codegen 的源码。

优点与缺点

Vert.x 提供的这种 Async RPC 有着许多优点:

  • 通过 Clustered Event Bus 传输消息,不需引入其它额外的组件;
  • 自动生成代理类及代理处理器,可以帮助我们做消息封装、传输、编码解码以及超时处理等问题,省掉不少冗余代码,让我们可以以 LPC 的方式进行 RPC 通信;
  • 多语言支持(Polyglot support)。这是 Vert.x 的一大亮点。只要加上 @VertxGen 注解并在编译期依赖中加上对应语言的依赖(如vertx-lang-ruby),Vert.x Codegen 就会自动处理注解并生成对应语言的服务代理(通过调用 Java 版本的服务代理实现)。这样 Async RPC 可以真正地做到不限 language。

当然 Vert.x 要求我们的服务接口必须是 基于回调的,这样写起来可能会不优雅。还好 @VertxGen 注解支持生成Rx版本的服务类,因此只要加上 vertx-rx-java 依赖,Codegen 就能生成对应的Rx风格的服务类(异步方法返回 Observable),这样我们就能以更 reactive 的风格来构建应用了,岂不美哉?

当然,为了考虑多语言支持的兼容性,Vert.x 在传递消息的时候依然使用了传统的 JSON,这样传输效率可能不如 Protobuf 高,但是不一定成为瓶颈(看业务情况;真正的瓶颈一般还是在 DB 上)。

Distributed System | RPC 模块设计与实现

RPC 是分布式系统中不可缺少的一部分。之前接触过几种 RPC 组件,这里就总结一下常见 RPC 模块的设计思想和实现。最后我们来设计一个可以方便进行 RPC 调用的 RPC 模块。

RPC 模块设计需要考虑的问题

RPC 模块将网络通信的过程封装成了方法调用的过程。从使用者的角度来看,在调用端进行RPC调用,就像进行本地函数调用一样;而在背后,RPC 模块会将先调用端的函数名称、参数等调用信息序列化,其中序列化的方式有很多种,比如 Java 原生序列化、JSON、Protobuf 等。接着 RPC 模块会将序列化后的消息通过某种协议(如 TCP, AMQP 等)发送到被调用端,被调用端在收到消息以后会对其解码,还原成调用信息,然后在本地进行方法调用,然后把调用结果发送回调用端,这样一次 RPC 调用过程就完成了。在这个过程中,我们要考虑到一些问题:

  • 设计成什么样的调用模型?
  • 调用信息通过什么样的方式序列化?通过哪种协议传输?性能如何?可靠性如何?
  • 分布式系统中最关注的问题:出现 failure 如何应对?如何容错?
  • 更为全面的服务治理功能(如服务发现、负载均衡等)

我们一点一点来思考。第一点是设计成什么样的调用模型。常见的几种模型:

  • 服务代理。即实现一个服务接口,被调用端实现此服务接口,实现对应的方法逻辑,并写好 RPC 调用信息接收部分;调用端通过 RPC 模块获取一个服务代理实例,这个服务代理实例继承了服务接口并封装了相应的远程调用逻辑(包括消息的编码、解码、传输等)。调用端通过这个服务代理实例进行 RPC 调用。像 Vert.x Service Proxy 和 gRPC 都是这种模型。这样的 RPC 模块需要具备生成服务代理类的功能
  • 直接调用,即设计特定的 API 用于 RPC 调用。比如 Golang 的 rpc 包,里面的 Client 就提供了一个 Call 方法用于任意RPC调用,调用者需要传入方法名称、参数以及返回值指针(异步模式下传入 callback handler)

我更倾向于选择服务代理这种模型,因为服务代理这种模型在进行 RPC 调用的时候非常方便,但是需要 RPC 模块生成服务代理类(静态或动态生成),实现起来可能会麻烦些;当然 Go 的 rpc 包封装的也比较好,调用也比较方便,考虑到 Go 的类型系统,这已经不错了。。。

RPC 调用耗时会包含通信耗时和本地调用耗时。当网络状况不好的时候,RPC 调用可能会很长时间才能得到结果。对传统的同步 RPC 模式来说,这期间会阻塞调用者的调用线程。当需要进行大量 RPC 调用的时候,这种阻塞就伤不起了。这时候,异步 RPC 模式就派上用场了。我们可以对传统 RPC 模式稍加改造,把服务接口设计成异步模式的,即每个方法都要绑定一个回调函数,或利用 Future-Promise 模型返回一个 Future,或者直接上 reactive 模式(未来的趋势)。设计成异步、响应式模式以后,整个架构的灵活性就能得到很大的提升。

第二点是调用信息的序列化/反序列化以及传输。序列化主要分为文本(如 JSON, XML 等)和二进制(如 Thrift, Protocol 等)两种,不同的序列化策略性能不同,因此我们应该尽量选择性能高,同时便于开发的序列化策略。在大型项目中我们常用 Protobuf,性能比较好,支持多语言,但是需要单独定义 .proto 文件(可以利用 protostuff);有的时候我们会选择 JSON,尽管效率不是很高但是方便,比如 Vert.x Service Proxy 就选择了 JSON 格式(底层依赖 Event Bus)。另一点就是传输协议的选择。通常情况下我们会选择 TCP 协议(以及各种基于 TCP 的应用层协议,如 HTTP/2)进行通信,当然用基于 AMQP 协议的消息队列也可以,两者都比较可靠。

这里还需提一点:如何高效地并发处理 request / response,这依赖于通信模块的实现。拿 Java 来说,基于 Netty NIO 或者 Java NIO / AIO 的 I/O 多路复用都可以很好地并发处理请求;而像 Go RPC 则是来一个 request 就创建一个 Goroutine 并在其中处理请求(Goroutine 作为轻量级用户态线程,创建性能消耗小)。

最后一点也是最重要的一点:实现容错,这也是分布式系统设计要考虑的一个核心。想象一下一次 RPC 调用过程中可能产生的各种 failure:

  • 网络拥塞
  • 丢包,通信异常
  • 服务提供端挂了,调用端得不到 response

一种简单的应对方式是不断地超时重传,即 at least once 模式。调用端设置一个超时定时器,若一定时间内没有收到response就继续发送调用请求,直到收到response或请求次数达到阈值。这种模式会发送重复请求,因此只适用于幂等性的操作,即执行多次结果相同的操作,比如读取操作。当然服务提供端也可以实现对应的逻辑来检查重复的请求。

更符合我们期望的容错方案是 at most once 模式。at most once 模式要求服务提供端检查重复请求,如果检查到当前请求是重复请求则返回之前的调用结果。服务提供端需要缓存之前的调用结果。这里面有几点需要考虑:

  • 如何实现重传和重复请求检测?是依靠协议(如TCP的超时重传)还是自己实现?

如果自己实现的话:

  • 如何检查重复请求?我们可以给每个请求生成一个独一无二的标识符(xid),并且在重传请求的时候使用相同的xid进行重传。用伪代码可以表示为:
1
2
3
4
5
6
7
if (seen(xid)) {
result = oldResult;
} else {
result = call(...);
oldResult = result;
setCurrentId(xid);
}
  • 如何保证xid是独一无二的?可以考虑使用UUID或者不同seed下的随机数。
  • 服务请求端需要在一个合适的时间丢弃掉保存的之前缓存的调用结果。
  • 当某个RPC调用过程还正在执行时,如何应对另外的重复请求?这种情况可以设置一个flag用于标识是否正在执行。
  • 如果服务调用端挂了并且重启怎么办?如果服务调用端将xid和调用结果缓存在内存中,那么保存的信息就丢失了。因此我们可以考虑将缓存信息定时写入硬盘,或者写入 replication server 中,当然这些情况就比较复杂了,涉及到高可用和一致性的问题。

由此可见,虽然 RPC 模块看似比较简单,但是设计的时候要考虑的问题还是非常多的。尤其是在保证性能的基础上又要保证可靠性,还要保证开发者的易用性,这就需要细致地思考了。

常见RPC模块实现

这里我来简单总结一下用过的常见的几个 RPC 模块的使用及实现思路。

Go RPC

Golang 的 rpc 包使用了 Go 自己的 gob 协议作为序列化协议(通过encoding/gob模块内的Encoder/Decoder进行编码和解码),而传输协议可以直接使用TCP(Dial方法)或者使用HTTP(DialHTTP)方法。开发者需要在服务端定义struct并且实现各种方法,然后将struct注册到服务端。需要进行RPC调用的时候,我们就可以在调用端通过Call方法(同步)或者Go方法(异步)进行调用。同步模式下调用结果即为reply指针所指的对象,而异步模式则会在调用结果准备就绪后通知绑定的channel并执行处理。

在rpc包的实现中(net/rpc/server.go),每个注册的服务类都被封装成了一个service结构体,而其中的每个方法则被封装成了一个methodType结构体:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
type methodType struct {
sync.Mutex // protects counters
method reflect.Method
ArgType reflect.Type
ReplyType reflect.Type
numCalls uint
}
type service struct {
name string // name of service
rcvr reflect.Value // receiver of methods for the service
typ reflect.Type // type of the receiver
method map[string]*methodType // registered methods
}

每个服务端都被封装成了一个Server结构体,其中的serviceMap存储着各个服务类的元数据:

1
2
3
4
5
6
7
8
type Server struct {
mu sync.RWMutex // protects the serviceMap
serviceMap map[string]*service
reqLock sync.Mutex // protects freeReq
freeReq *Request
respLock sync.Mutex // protects freeResp
freeResp *Response
}

RPC Server 处理调用请求的默认路径是 /_goRPC_。当请求到达时,Go就会调用Server结构体实现的ServeHTTP方法,经ServeConn方法传入gob codec预处理以后最终在ServeCodec方法内处理请求并进行调用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func (server *Server) ServeCodec(codec ServerCodec) {
sending := new(sync.Mutex)
for {
service, mtype, req, argv, replyv, keepReading, err := server.readRequest(codec)
if err != nil {
if debugLog && err != io.EOF {
log.Println("rpc:", err)
}
if !keepReading {
break
}
// send a response if we actually managed to read a header.
if req != nil {
server.sendResponse(sending, req, invalidRequest, codec, err.Error())
server.freeRequest(req)
}
continue
}
go service.call(server, sending, mtype, req, argv, replyv, codec)
}
codec.Close()
}

如果成功读取请求数据,那么接下来 RPC Server 就会新建一个 Goroutine 用来在本地执行方法,并向调用端返回 response:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func (s *service) call(server *Server, sending *sync.Mutex, mtype *methodType, req *Request, argv, replyv reflect.Value, codec ServerCodec) {
mtype.Lock()
mtype.numCalls++
mtype.Unlock()
function := mtype.method.Func
// Invoke the method, providing a new value for the reply.
returnValues := function.Call([]reflect.Value{s.rcvr, argv, replyv})
// The return value for the method is an error.
errInter := returnValues[0].Interface()
errmsg := ""
if errInter != nil {
errmsg = errInter.(error).Error()
}
server.sendResponse(sending, req, replyv.Interface(), codec, errmsg)
server.freeRequest(req)
}

在执行调用的过程中应该注意并发问题,防止资源争用,修改数据时需要对数据加锁;至于方法的执行就是利用了Go的反射机制。调用完以后,RPC Server 接着调用sendResponse方法发送response,其中写入response的时候同样需要加锁,防止资源争用。

gRPC

gRPC 是 Google 开源的一个通用的 RPC 框架,支持 C, Java 和 Go 等语言。既然是 Google 出品,序列化协议必然用 protobuf 啦(毕竟高效),传输协议使用 HTTP/2,更为通用,性能也还可以。开发时需要在 .proto 文件里定义数据类型以及服务接口,然后配上 protoc 的 gRPC 插件就能够自动生成各个语言的服务接口和代理类。

Vert.x Service Proxy

Vert.x Service Proxy 是 Vert.x 的一个异步 RPC 组件,支持通过各种 JVM 语言(Java, Scala, JS, JRuby, Groovy等)进行 RPC 调用。使用 Vert.x Service Proxy 时我们只需要按照异步开发模式编写服务接口,加上相应的注解,Vert.x Service Proxy 就会自动生成相应的服务代理类和服务调用处理类。Vert.x Service Proxy 底层借助 Event Bus 进行通信,调用时将调用消息包装成 JSON 数据然后通过 Event Bus 传输到服务端,得到结果后再返回给调用端。Vert.x 的一大特性就是异步、响应式编程,因此 Vert.x Service Proxy 的 RPC 模型为异步 RPC,用起来非常方便。几个异步过程可以通过各种组合子串成一串,妥妥的 reactive programming 的风格~

更多的关于 Vert.x Service Proxy 的实现原理的内容可以看这一篇:Vert.x 技术内幕 | 异步 RPC 实现原理

PS: 我经常吐槽 Vert.x Service Proxy 这个名字,因为光看名字很多人不知道它可以用来实现 RPC,导致了很多人以为 Vert.x 不能做 RPC,应该改名叫 Vert.x Async RPC 比较合适。。。当然它还有很大的改进空间,主要是被 Vert.x Event Bus 的性能和可靠性给拖累了。。。

Vert.x 技术内幕 | Event Bus 源码分析 (集群模式)

上篇文章中我们探索了Local模式下Event Bus的源码,在这篇文章中我们来探索一下Vert.x中的Clustered Event Bus是如何实现的。对应的Vert.x版本为3.3.2

集群模式介绍

我们先来简单地介绍一下集群模式下Event Bus的基本原理。

我们可以通过集群模式下的Event Bus在不同的服务器之间进行通信,其本质为TCP通信。Vert.x集群模式需要一个集群管理器(默认为HazelcastClusterManager)来管理集群的状态,存储元数据。当我们在某个节点A给集群模式的Event Bus绑定一个对应地址addressconsumer的时候,Event Bus会将此节点的ServerID(包含hostport信息)存储至集群管理器的共享Map中,key为绑定的地址address,value为绑定了此地址address的所有结点的ServerID集合(可以看作是具有负载均衡功能的Set)。集群中的所有节点都可以从集群管理器中获取Map记录。并且绑定consumer的同时节点A会建立一个NetServer接收数据。这样,我们再通过另一个结点B向此地址address发送消息的时候,B就会从集群管理器中取出此地址对应的ServerID集合,并根据是点对点发送还是发布,根据相应的策略创建NetClient执行消息分发逻辑。这样,对应的NetServer收到数据后会对其进行解码然后在本地进行消息的处理。

集群模式下我们还需要注意几个问题:

  • 某个节点挂了怎么办?
  • 如何确保结点的高可用性?

当某个节点挂掉的时候,其连接将会不可用,集群管理器就会将此节点的信息从集群中移除,并且传播到所有的节点删除对应缓存的信息,这样发消息的时候就不会发送到挂掉的无效节点处。至于高可用性,Vert.x提供了高可用管理器HAManager用于实现高可用性,在发生故障时能够快速failover,详情可见官方文档

好了,下面我们就来分析一下Clustered Event Bus的源码。集群模式下Event Bus的类型为ClusteredEventBus,它继承了单机模式的EventBusImpl类。其初始化过程与Local模式大同小异,因此这里就直接分析发送和接受消息相关的逻辑了。

绑定MessageConsumer

我们还是先来看consumer方法的逻辑。前面的调用逻辑都和Local模式下相同,可以参考之前的文章。不同之处在添加记录的地方。Cluster模式下Event Bus需要将当前机器的位置存储至Map中并且传播至集群内的所有节点,因此ClusteredEventBus重写了四个参数版本的addRegistration方法(之前在EventBusImpl类中这个版本的方法用处不大,这里用处就大了):

1
2
3
4
5
6
7
8
9
10
11
@Override
protected <T> void addRegistration(boolean newAddress, String address,
boolean replyHandler, boolean localOnly,
Handler<AsyncResult<Void>> completionHandler) {
if (newAddress && subs != null && !replyHandler && !localOnly) {
// Propagate the information
subs.add(address, serverID, completionHandler);
} else {
completionHandler.handle(Future.succeededFuture());
}
}

如果要绑定MessageConsumer对应的地址在本地中没有注册过,并且不是Event Bus自动生成的reply consumer,并且允许在集群范围内传播的话,Event Bus就会将当前机器的位置添加到集群内的记录subs中。subs的类型为AsyncMultiMap:

1
private AsyncMultiMap<String, ServerID> subs;

ClusteredEventBus启动时会对其进行初始化:

1
2
3
4
5
6
7
8
clusterManager.<String, ServerID>getAsyncMultiMap(SUBS_MAP_NAME, ar2 -> {
if (ar2.succeeded()) {
subs = ar2.result();
// 其他代码暂略。。。
} else {
// 代码略。。。
}
});

从名字就可以看出来,AsyncMultiMap允许一键多值,并且其变动可以在集群范围内传播。由于AsyncMultiMap集群范围内的,因此对其操作都是异步的。在这里我们可以简单地把它看作是一个Map<String, ChoosableIterable<ServerID>>类型的键值对,其中ChoosableIterable与之前见到过的Handlers类似,属于可以通过轮询算法获取某一元素的集合。subs的key为绑定的地址,value为绑定此地址的机器位置的集合。机器的位置用ServerID表示,里面包含了该机器的hostport。这样,每当我们向某个地址绑定一个MessageConsumer的时候,绑定consumer的ServerID就会被记录到集群中并与地址相对应,其它机器在向此地址发送(或发布)消息的时候,Event Bus就可以从集群中获取在此地址上绑定了consumer的所有ServerID,再根据相应的策略选出合适的ServerID建立TCP通信将数据发送至对应机器中,对应机器收到消息后解码并在本地对其进行处理。

这里面还需要注意一点:我们可以在EventBusOptions中指定ServerIDporthost,若不指定则port将随机分配(NetServer的特性)。

剩下的过程也就大同小异了。至于unregister方法,无非就是将底层的removeRegistration方法重写,从subs中删除对应的ServerID并传播至其它节点:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@Override
protected <T> void removeRegistration(HandlerHolder lastHolder, String address,
Handler<AsyncResult<Void>> completionHandler) {
if (lastHolder != null && subs != null && !lastHolder.isLocalOnly()) {
removeSub(address, serverID, completionHandler);
} else {
callCompletionHandlerAsync(completionHandler);
}
}
private void removeSub(String subName, ServerID theServerID, Handler<AsyncResult<Void>> completionHandler) {
subs.remove(subName, theServerID, ar -> {
if (!ar.succeeded()) {
log.error("Failed to remove sub", ar.cause());
} else {
if (ar.result()) {
if (completionHandler != null) {
completionHandler.handle(Future.succeededFuture());
}
} else {
if (completionHandler != null) {
completionHandler.handle(Future.failedFuture("sub not found"));
}
}
}
});
}

消息的发送/发布

集群模式下的消息与本地模式下的消息不同。集群模式下的消息实体类型为ClusteredMessage,它继承了MessageImpl消息实体类,并且根据远程传输的特性实现了一种Wire Protocol用于远程传输消息,并负责消息的编码和解码。具体的实现就不展开说了,如果有兴趣的话可以阅读ClusteredMessage类中相关方法的实现。

我们上篇文章提到过,Event Bus底层通过createMessage方法创建消息。因此ClusteredEventBus里就对此方法进行了重写,当然改动就是把MessageImpl替换成了ClusteredMessage:

1
2
3
4
5
6
7
8
@Override
protected MessageImpl createMessage(boolean send, String address, MultiMap headers, Object body, String codecName) {
Objects.requireNonNull(address, "no null address accepted");
MessageCodec codec = codecManager.lookupCodec(body, codecName);
@SuppressWarnings("unchecked")
ClusteredMessage msg = new ClusteredMessage(serverID, address, null, headers, body, codec, send, this);
return msg;
}

接下来就是消息的发送逻辑了。ClusteredEventBus重写了sendOrPub方法,此方法存在于SendContextImpl类中的next方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
@Override
public void next() {
if (iter.hasNext()) {
Handler<SendContext> handler = iter.next();
try {
handler.handle(this);
} catch (Throwable t) {
log.error("Failure in interceptor", t);
}
} else {
sendOrPub(this);
}
}

我们来看一下ClusteredEventBus是如何进行集群内消息的分发的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Override
protected <T> void sendOrPub(SendContextImpl<T> sendContext) {
String address = sendContext.message.address();
Handler<AsyncResult<ChoosableIterable<ServerID>>> resultHandler = asyncResult -> {
if (asyncResult.succeeded()) {
ChoosableIterable<ServerID> serverIDs = asyncResult.result();
if (serverIDs != null && !serverIDs.isEmpty()) {
sendToSubs(serverIDs, sendContext);
} else {
metrics.messageSent(address, !sendContext.message.send(), true, false);
deliverMessageLocally(sendContext);
}
} else {
log.error("Failed to send message", asyncResult.cause());
}
};
if (Vertx.currentContext() == null) {
// Guarantees the order when there is no current context
sendNoContext.runOnContext(v -> {
subs.get(address, resultHandler);
});
} else {
subs.get(address, resultHandler);
}
}

首先Event Bus需要从传入的sendContext中获取要发送至的地址。接着Event Bus需要从集群管理器中获取在此地址上绑定consumer的所有ServerID,这个过程是异步的,并且需要在Vert.x Context中执行。如果获取记录成功,我们会得到一个可通过轮询算法获取ServerID的集合(类型为ChoosableIterable<ServerID>)。如果集合为空,则代表集群内其它节点没有在此地址绑定consumer(或者由于一致性问题没有同步),Event Bus就将消息通过deliverMessageLocally方法在本地进行相应的分发。deliverMessageLocally方法的逻辑之前我们已经详细讲过了,这里就不再细说了;如果集合不为空,Event Bus就调用sendToSubs方法进行下一步操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
private <T> void sendToSubs(ChoosableIterable<ServerID> subs, SendContextImpl<T> sendContext) {
String address = sendContext.message.address();
if (sendContext.message.send()) {
// Choose one
ServerID sid = subs.choose();
if (!sid.equals(serverID)) { //We don't send to this node
metrics.messageSent(address, false, false, true);
sendRemote(sid, sendContext.message);
} else {
metrics.messageSent(address, false, true, false);
deliverMessageLocally(sendContext);
}
} else {
// Publish
boolean local = false;
boolean remote = false;
for (ServerID sid : subs) {
if (!sid.equals(serverID)) { //We don't send to this node
remote = true;
sendRemote(sid, sendContext.message);
} else {
local = true;
}
}
metrics.messageSent(address, true, local, remote);
if (local) {
deliverMessageLocally(sendContext);
}
}
}

这里就到了分sendpublish的时候了。如果发送消息的模式为点对点模式(send),Event Bus会从给的的集合中通过轮询算法获取一个ServerID。然后Event Bus会检查获取到的ServerID是否与本机ServerID相同,如果相同则代表在一个机子上,直接记录metrics信息并且调用deliverMessageLocally方法往本地发送消息即可;如果不相同,Event Bus就会调用sendRemote方法执行真正的远程消息发送逻辑。发布订阅模式的逻辑与其大同小异,只不过需要遍历一下ChoosableIterable<ServerID>集合,然后依次执行之前讲过的逻辑。注意如果要在本地发布消息只需要发一次。

真正的远程消息发送逻辑在sendRemote方法中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private void sendRemote(ServerID theServerID, MessageImpl message) {
ConnectionHolder holder = connections.get(theServerID);
if (holder == null) {
holder = new ConnectionHolder(this, theServerID, options);
ConnectionHolder prevHolder = connections.putIfAbsent(theServerID, holder);
if (prevHolder != null) {
// Another one sneaked in
holder = prevHolder;
} else {
holder.connect();
}
}
holder.writeMessage((ClusteredMessage)message);
}

一开始我们就提到过,节点之间通过Event Bus进行通信的本质是TCP,因此这里需要创建一个NetClient作为TCP服务端,连接到之前获取的ServerID对应的节点然后将消息通过TCP协议发送至接收端。这里Vert.x用一个封装类ConnectionHolderNetClient进行了一些封装。

ClusteredEventBus中维持着一个connections哈希表对用于保存ServerID对应的连接ConnectionHolder。在sendRemote方法中,Event Bus首先会从connections中获取ServerID对应的连接。如果获取不到就创建连接并将其添加至connections记录中并调用对应ConnectionHolderconnect方法建立连接;最后调用writeMessage方法将消息编码后通过TCP发送至对应的接收端。

那么ConnectionHolder是如何实现的呢?我们来看一下其构造函数:

1
2
3
4
5
6
7
8
9
10
ConnectionHolder(ClusteredEventBus eventBus, ServerID serverID, EventBusOptions options) {
this.eventBus = eventBus;
this.serverID = serverID;
this.vertx = eventBus.vertx();
this.metrics = eventBus.getMetrics();
NetClientOptions clientOptions = new NetClientOptions(options.toJson());
ClusteredEventBus.setCertOptions(clientOptions, options.getKeyCertOptions());
ClusteredEventBus.setTrustOptions(clientOptions, options.getTrustOptions());
client = new NetClientImpl(eventBus.vertx(), clientOptions, false);
}

可以看到ConnectionHolder初始化的时候会创建一个NetClient作为TCP请求端,而请求的对象就是接收端的NetServer(后边会讲),客户端配置已经在EventBusOptions中事先配置好了。我们来看看connect方法是如何建立连接的:

1
2
3
4
5
6
7
8
9
10
11
12
synchronized void connect() {
if (connected) {
throw new IllegalStateException("Already connected");
}
client.connect(serverID.port, serverID.host, res -> {
if (res.succeeded()) {
connected(res.result());
} else {
close();
}
});
}

可以看到这里很简单地调用了NetClient#connect方法建立TCP连接,如果建立连接成功的话会得到一个NetSocket对象。Event Bus接着将其传至connected方法中进行处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private synchronized void connected(NetSocket socket) {
this.socket = socket;
connected = true;
socket.exceptionHandler(t -> close());
socket.closeHandler(v -> close());
socket.handler(data -> {
// Got a pong back
vertx.cancelTimer(timeoutID);
schedulePing();
});
// Start a pinger
schedulePing();
for (ClusteredMessage message : pending) {
Buffer data = message.encodeToWire();
metrics.messageWritten(message.address(), data.length());
socket.write(data);
}
pending.clear();
}

首先Event Bus通过exceptionHandlercloseHandler方法给连接对应的NetSocket绑定异常回调和连接关闭回调,触发的时候都调用close方法关闭连接;为了保证不丢失连接,消息发送方每隔一段时间就需要对消息接收方发送一次心跳包(PING),如果消息接收方在一定时间内没有回复,那么就认为连接丢失,调用close方法关闭连接。心跳检测的逻辑在schedulePing方法中,比较清晰,这里就不详细说了。大家会发现ConnectionHolder里也有个消息队列(缓冲区)pending,并且这里会将队列中的消息依次通过TCP发送至接收端。为什么需要这样呢?其实,这要从创建TCP客户端说起。创建TCP客户端这个过程应该是异步的,需要消耗一定时间,而ConnectionHolder中封装的connect方法却是同步式的。前面我们刚刚看过,通过connect方法建立连接以后会接着调用writeMessage方法发送消息,而这时候客户端连接可能还没建立,因此需要这么个缓冲区先存着,等着连接建立了再一块发送出去(存疑:为什么不将connect方法直接设计成异步的?)。

至于发送消息的writeMessage方法,其逻辑一目了然:

1
2
3
4
5
6
7
8
9
10
11
12
synchronized void writeMessage(ClusteredMessage message) {
if (connected) {
Buffer data = message.encodeToWire();
metrics.messageWritten(message.address(), data.length());
socket.write(data);
} else {
if (pending == null) {
pending = new ArrayDeque<>();
}
pending.add(message);
}
}

如果连接已建立,Event Bus就会调用对应ClusteredMessageencodeToWire方法将其转化为字节流Buffer,然后记录metrics信息,最后通过socketwrite方法将消息写入到Socket中,这样消息就从发送端通过TCP发送到了接收端。如果连接未建立,就如之前讲的那样,先把消息存到消息队列中,等连接建立了再一块发出去。

这样,Clustered Event Bus下消息的发送逻辑就理清楚了。下面我们看一下接收端是如何接收消息并在本地进行消息的处理的。

消息的接收

一开始我们提到过,每个节点的Clustered Event Bus在启动时都会创建一个NetServer作为接收消息的TCP服务端。TCP Server的porthost可以在EventBusOptions中指定,如果不指定的话默认随机分配port,然后Event Bus会根据NetServer的配置来生成当前节点的ServerID

创建TCP Server的逻辑在start方法中,与接受消息有关的逻辑就是这一句:

1
server.connectHandler(getServerHandler());

我们知道,NetServerconnectHandler方法用于绑定对服务端Socket的处理函数,而这里绑定的处理函数是由getServerHandler方法生成的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
private Handler<NetSocket> getServerHandler() {
return socket -> {
RecordParser parser = RecordParser.newFixed(4, null);
Handler<Buffer> handler = new Handler<Buffer>() {
int size = -1;
public void handle(Buffer buff) {
if (size == -1) {
size = buff.getInt(0);
parser.fixedSizeMode(size);
} else {
ClusteredMessage received = new ClusteredMessage();
received.readFromWire(buff, codecManager);
metrics.messageRead(received.address(), buff.length());
parser.fixedSizeMode(4);
size = -1;
if (received.codec() == CodecManager.PING_MESSAGE_CODEC) {
// Just send back pong directly on connection
socket.write(PONG);
} else {
deliverMessageLocally(received);
}
}
}
};
parser.setOutput(handler);
socket.handler(parser);
};
}

逻辑非常清晰。这里Event Bus使用了RecordParser来获取发送过来的对应长度的Buffer,并将其绑定在NetServer的Socket上。真正的解析Buffer并处理的逻辑在内部的handler中。之前ClusteredMessage中的Wire Protocol规定Buffer的首部第一个int值为要发送Buffer的长度(逻辑见ClusteredMessage#encodeToWire方法),因此这里首先获取长度,然后给parser设定正确的fixed size,这样parser就可以截取正确长度的Buffer流了。下面Event Bus会创建一个空的ClusteredMessage,然后调用其readFromWire方法从Buffer中重建消息。当然这里还要记录消息已经读取的metrics信息。接着检测收到的消息实体类型是否为心跳检测包(PING),如果是的话就发送回ACK消息(PONG);如果不是心跳包,则代表是正常的消息,Event Bus就调用我们熟悉的deliverMessageLocally函数在本地进行分发处理,接下来的过程就和Local模式一样了。

Vert.x 技术内幕 | Event Bus 源码分析 (Local模式)

Event Bus是Vert.x的“神经系统”,是最为关键的几个部分之一。今天我们就来探索一下Event Bus的实现原理。本篇分析的是Local模式的Event Bus,对应的Vert.x版本为3.3.2

本文假定读者有一定的并发编程基础以及Vert.x使用基础,并且对Vert.x的线程模型以及back-pressure有所了解。

Local Event Bus的创建

一般情况下,我们通过VertxeventBus方法来创建或获取一个EventBus实例:

1
2
Vertx vertx = Vertx.vertx();
EventBus eventBus = vertx.eventBus();

eventBus方法定义于Vertx接口中,我们来看一下其位于VertxImpl类中的实现:

1
2
3
4
5
6
7
8
9
10
public EventBus eventBus() {
if (eventBus == null) {
// If reading from different thread possibility that it's been set but not visible - so provide
// memory barrier
synchronized (this) {
return eventBus;
}
}
return eventBus;
}

可以看到此方法返回VertxImpl实例中的eventBus成员,同时需要注意并发可见性问题。那么eventBus成员是何时初始化的呢?答案在VertxImpl类的构造函数中。这里截取相关逻辑:

1
2
3
4
5
6
if (options.isClustered()) {
// 集群模式相关逻辑
} else {
this.clusterManager = null;
createAndStartEventBus(options, resultHandler);
}

可以看到VertxImpl内部是通过createAndStartEventBus方法来初始化eventBus的。我们来看一下其逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private void createAndStartEventBus(VertxOptions options, Handler<AsyncResult<Vertx>> resultHandler) {
if (options.isClustered()) {
eventBus = new ClusteredEventBus(this, options, clusterManager, haManager);
} else {
eventBus = new EventBusImpl(this);
}
eventBus.start(ar2 -> {
if (ar2.succeeded()) {
// If the metric provider wants to use the event bus, it cannot use it in its constructor as the event bus
// may not be initialized yet. We invokes the eventBusInitialized so it can starts using the event bus.
metrics.eventBusInitialized(eventBus);
if (resultHandler != null) {
resultHandler.handle(Future.succeededFuture(this));
}
} else {
log.error("Failed to start event bus", ar2.cause());
}
});
}

可以看到此方法通过eventBus = new EventBusImpl(this)eventBus进行了初始化(Local模式为EventBusImpl),并且调用eventBusstart方法对其进行一些额外的初始化工作。我们来看一下EventBusImpl类的start方法:

1
2
3
4
5
6
7
public synchronized void start(Handler<AsyncResult<Void>> completionHandler) {
if (started) {
throw new IllegalStateException("Already started");
}
started = true;
completionHandler.handle(Future.succeededFuture());
}

首先初始化过程需要防止race condition,因此方法为synchronized的。该方法仅仅将EventBusImpl类中的一个started标志位设为true来代表Event Bus已启动。注意started标志位为volatile的,这样可以保证其可见性,确保其它线程通过checkStarted方法读到的started结果总是最新的。设置完started标志位后,Vert.x会接着调用传入的completionHandler处理函数,也就是上面我们在createAndStartEventBus方法中看到的 —— 调用metrics成员的eventBusInitialized方法以便Metrics类可以在Event Bus初始化完毕后使用它(不过默认情况下此方法的逻辑为空)。

可以看到初始化过程还是比较简单的,我们接下来先来看看订阅消息 —— consumer方法的逻辑。

consume

我们来看一下consumer方法的逻辑,其原型位于EventBus接口中:

1
2
<T> MessageConsumer<T> consumer(String address);
<T> MessageConsumer<T> consumer(String address, Handler<Message<T>> handler);

其实现位于EventBusImpl类中:

1
2
3
4
5
6
7
@Override
public <T> MessageConsumer<T> consumer(String address, Handler<Message<T>> handler) {
Objects.requireNonNull(handler, "handler");
MessageConsumer<T> consumer = consumer(address);
consumer.handler(handler);
return consumer;
}

首先要确保传入的handler不为空,然后Vert.x会调用只接受一个address参数的consumer方法获取对应的MessageConsumer,最后给获取到的MessageConsumer绑定上传入的handler。我们首先来看一下另一个consumer方法的实现:

1
2
3
4
5
6
@Override
public <T> MessageConsumer<T> consumer(String address) {
checkStarted();
Objects.requireNonNull(address, "address");
return new HandlerRegistration<>(vertx, metrics, this, address, null, false, null, -1);
}

首先Vert.x会检查Event Bus是否已经启动,并且确保传入的地址不为空。然后Vert.x会传入一大堆参数创建一个新的HandlerRegistration类型的实例,并返回。可以推测HandlerRegistrationMessageConsumer接口的具体实现,它一定非常重要。所以我们来看一看HandlerRegistration类是个啥玩意。首先看一下HandlerRegistration的类体系结构:

可以看到HandlerRegistration类同时继承了MessageConsumer<T>以及Handler<Message<T>>接口,从其类名可以看出它相当于一个”Handler注册记录”,是非常重要的一个类。它有一堆的成员变量,构造函数对vertx, metrics, eventBus, address(发送地址), repliedAddress(回复地址), localOnly(是否在集群内传播), asyncResultHandler等几个成员变量进行初始化,并且检查超时时间timeout,如果设定了超时时间那么设定并保存超时计时器(仅用于reply handler中),如果计时器时间到,代表回复超时。因为有一些函数还没介绍,超时的逻辑我们后边再讲。

Note: 由于MessageConsumer接口继承了ReadStream接口,因此它支持back-pressure,其实现就在HandlerRegistration类中。我们将稍后解析back-pressure的实现。

现在回到consumer方法中来。创建了MessageConsumer实例后,我们接着调用它的handler方法绑定上对应的消息处理函数。handler方法的实现位于HandlerRegistration类中:

1
2
3
4
5
6
7
8
9
10
11
12
@Override
public synchronized MessageConsumer<T> handler(Handler<Message<T>> handler) {
this.handler = handler;
if (this.handler != null && !registered) {
registered = true;
eventBus.addRegistration(address, this, repliedAddress != null, localOnly);
} else if (this.handler == null && registered) {
// This will set registered to false
this.unregister();
}
return this;
}

首先,handler方法将此HandlerRegistration中的handler成员设置为传入的消息处理函数。HandlerRegistration类中有一个registered标志位代表是否已绑定消息处理函数。handler方法会检查传入的handler是否为空且是否已绑定消息处理函数。如果不为空且未绑定,Vert.x就会将registered标志位设为true并且调用eventBusaddRegistration方法将此consumer注册至Event Bus上;如果handler为空且已绑定消息处理函数,我们就调用unregister方法注销当前的consumer。我们稍后会分析unregister方法的实现。

前面提到过注册consumer的逻辑位于Event Bus的addRegistration方法中,因此我们来分析一下它的实现:

1
2
3
4
5
6
protected <T> void addRegistration(String address, HandlerRegistration<T> registration,
boolean replyHandler, boolean localOnly) {
Objects.requireNonNull(registration.getHandler(), "handler");
boolean newAddress = addLocalRegistration(address, registration, replyHandler, localOnly);
addRegistration(newAddress, address, replyHandler, localOnly, registration::setResult);
}

addRegistration方法接受四个参数:发送地址address、传入的consumer registration、代表是否为reply handler的标志位replyHandler以及代表是否在集群范围内传播的标志位localOnly。首先确保传入的HandlerRegistration不为空。然后Vert.x会调用addLocalRegistration方法将此consumer注册至Event Bus上:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
protected <T> boolean addLocalRegistration(String address, HandlerRegistration<T> registration,
boolean replyHandler, boolean localOnly) {
Objects.requireNonNull(address, "address");
Context context = Vertx.currentContext();
boolean hasContext = context != null;
if (!hasContext) {
// Embedded
context = vertx.getOrCreateContext();
}
registration.setHandlerContext(context);
boolean newAddress = false;
HandlerHolder holder = new HandlerHolder<>(metrics, registration, replyHandler, localOnly, context);
Handlers handlers = handlerMap.get(address);
if (handlers == null) {
handlers = new Handlers();
Handlers prevHandlers = handlerMap.putIfAbsent(address, handlers);
if (prevHandlers != null) {
handlers = prevHandlers;
}
newAddress = true;
}
handlers.list.add(holder);
if (hasContext) {
HandlerEntry entry = new HandlerEntry<>(address, registration);
context.addCloseHook(entry);
}
return newAddress;
}

首先该方法要确保地址address不为空,接着它会获取当前线程下对应的Vert.x Context,如果获取不到则表明当前不在Verticle中(即Embedded),需要调用vertx.getOrCreateContext()来获取Context;然后将获取到的Context赋值给registration内部的handlerContext(代表消息处理对应的Vert.x Context)。

下面就要将给定的registration注册至Event Bus上了。这里Vert.x用一个HandlerHolder类来包装registrationcontext。接着Vert.x会从存储消息处理Handler的哈希表handlerMap中获取给定地址对应的Handlers,哈希表的类型为ConcurrentMap<String, Handlers>,key为地址,value为对应的HandlerHolder集合。注意这里的Handlers类代表一些Handler的集合,它内部维护着一个列表list用于存储每个HandlerHolderHandlers类中只有一个choose函数,此函数根据轮询算法从HandlerHolder集合中选定一个HandlerHolder,这即是Event Bus发送消息时实现load-balancing的方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class Handlers {
private final AtomicInteger pos = new AtomicInteger(0);
public final List<HandlerHolder> list = new CopyOnWriteArrayList<>();
public HandlerHolder choose() {
while (true) {
int size = list.size();
if (size == 0) {
return null;
}
int p = pos.getAndIncrement();
if (p >= size - 1) {
pos.set(0);
}
try {
return list.get(p);
} catch (IndexOutOfBoundsException e) {
// Can happen
pos.set(0);
}
}
}
}

获取到对应的handlers以后,Vert.x首先需要检查其是否为空,如果为空代表此地址还没有注册消息处理Handler,Vert.x就会创建一个Handlers并且将其置入handlerMap中,将newAddress标志位设为true代表这是一个新注册的地址,然后将其赋值给handlers。接着我们向handlers中的HandlerHolder列表list中添加刚刚创建的HandlerHolder实例,这样就将registration注册至Event Bus中了。

前面判断当前线程是否在Vert.x Context的标志位hasContext还有一个用途:如果当前线程在Vert.x Context下(比如在Verticle中),Vert.x会通过addCloseHook方法给当前的context添加一个钩子函数用于注销当前绑定的registration。当对应的Verticle被undeploy的时候,此Verticle绑定的所有消息处理Handler都会被unregister。Hook的类型为HandlerEntry<T>,它继承了Closeable接口,对应的逻辑在close函数中实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class HandlerEntry<T> implements Closeable {
final String address;
final HandlerRegistration<T> handler;
public HandlerEntry(String address, HandlerRegistration<T> handler) {
this.address = address;
this.handler = handler;
}
// ...
// Called by context on undeploy
public void close(Handler<AsyncResult<Void>> completionHandler) {
handler.unregister(completionHandler);
completionHandler.handle(Future.succeededFuture());
}
}

可以看到close函数会将绑定的registration从Event Bus的handlerMap中移除并执行completionHandler中的逻辑,completionHandler可由用户指定。

那么在哪里调用这些绑定的hook呢?答案是在DeploymentManager类中的doUndeploy方法中,通过contextrunCloseHooks方法执行绑定的hook函数。相关代码如下(只截取相关逻辑):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public synchronized void doUndeploy(ContextImpl undeployingContext, Handler<AsyncResult<Void>> completionHandler) {
// 前面代码略
context.runCloseHooks(ar2 -> {
if (ar2.failed()) {
// Log error but we report success anyway
log.error("Failed to run close hook", ar2.cause());
}
if (ar.succeeded() && undeployCount.incrementAndGet() == numToUndeploy) {
reportSuccess(null, undeployingContext, completionHandler);
} else if (ar.failed() && !failureReported.get()) {
failureReported.set(true);
reportFailure(ar.cause(), undeployingContext, completionHandler);
}
});
// 后面代码略
}

再回到addRegistration方法中。刚才addLocalRegistration方法的返回值newAddress代表对应的地址是否为新注册的。接着我们调用另一个版本的addRegistration方法,传入了一大堆参数:

1
2
3
4
5
protected <T> void addRegistration(boolean newAddress, String address,
boolean replyHandler, boolean localOnly,
Handler<AsyncResult<Void>> completionHandler) {
completionHandler.handle(Future.succeededFuture());
}

好吧,传入的前几个参数没用到。。。最后一个参数completionHandler传入的是registration::setResult方法引用,也就是说这个方法调用了对应registrationsetResult方法。其实现位于HandlerRegistration类中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public synchronized void setResult(AsyncResult<Void> result) {
this.result = result;
if (completionHandler != null) {
if (result.succeeded()) {
metric = metrics.handlerRegistered(address, repliedAddress);
}
Handler<AsyncResult<Void>> callback = completionHandler;
vertx.runOnContext(v -> callback.handle(result));
} else if (result.failed()) {
log.error("Failed to propagate registration for handler " + handler + " and address " + address);
} else {
metric = metrics.handlerRegistered(address, repliedAddress);
}
}

首先先设置registration内部的result成员(正常情况下为Future.succeededFuture())。接着Vert.x会判断registration是否绑定了completionHandler(与之前的completionHandler不同,这里的completionHandlerMessageConsumer注册成功时调用的Handler),若绑定则记录Metrics信息(handlerRegistered)并在Vert.x Context内调用completionHandler的逻辑;若未绑定completionHandler则仅记录Metrics信息。

到此为止,consumer方法的逻辑就分析完了。在分析sendpublish方法的逻辑之前,我们先来看一下如何注销绑定的MessageConsumer

unregister

我们通过调用MessageConsumerunregister方法实现注销操作。Vert.x提供了两个版本的unregister方法:

1
2
3
void unregister();
void unregister(Handler<AsyncResult<Void>> completionHandler);

其中第二个版本的unregister方法会在注销操作完成时调用传入的completionHandler。比如在cluster范围内注销consumer需要消耗一定的时间在集群内传播,因此第二个版本的方法就会派上用场。我们来看一下其实现,它们最后都是调用了HandlerRegistration类的doUnregister方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private void doUnregister(Handler<AsyncResult<Void>> completionHandler, boolean callEndHandler) {
if (timeoutID != -1) {
vertx.cancelTimer(timeoutID);
}
if (endHandler != null && callEndHandler) {
Handler<Void> theEndHandler = endHandler;
Handler<AsyncResult<Void>> handler = completionHandler;
completionHandler = ar -> {
theEndHandler.handle(null);
if (handler != null) {
handler.handle(ar);
}
};
}
if (registered) {
registered = false;
eventBus.removeRegistration(address, this, completionHandler);
} else {
callCompletionHandlerAsync(completionHandler);
}
registered = false;
}

如果设定了超时定时器(timeoutID合法),那么Vert.x会首先将定时器关闭。接着Vert.x会判断是否需要调用endHandler。那么endHandler又是什么呢?前面我们提到过MessageConsumer接口继承了ReadStream接口,而ReadStream接口定义了一个endHandler方法用于绑定一个endHandler,当stream中的数据读取完毕时会调用。而在Event Bus中,消息源源不断地从一处发送至另一处,因此只有在某个consumer
被unregister的时候,其对应的stream才可以叫“读取完毕”,因此Vert.x选择在doUnregister方法中调用endHandler

接着Vert.x会判断此consumer是否已注册消息处理函数Handler(通过检查registered标志位),若已注册则将对应的Handler从Event Bus中的handlerMap中移除并将registered设为false;若还未注册Handler且提供了注销结束时的回调completionHandler(注意不是HandlerRegistration类的成员变量completionHandler,而是之前第二个版本的unregister中传入的Handler,用同样的名字很容易混。。。),则通过callCompletionHandlerAsync方法调用回调函数。

从Event Bus中移除Handler的逻辑位于EventBusImpl类的removeRegistration方法中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
protected <T> void removeRegistration(String address, HandlerRegistration<T> handler, Handler<AsyncResult<Void>> completionHandler) {
HandlerHolder holder = removeLocalRegistration(address, handler);
removeRegistration(holder, address, completionHandler);
}
protected <T> void removeRegistration(HandlerHolder handlerHolder, String address,
Handler<AsyncResult<Void>> completionHandler) {
callCompletionHandlerAsync(completionHandler);
}
protected <T> HandlerHolder removeLocalRegistration(String address, HandlerRegistration<T> handler) {
Handlers handlers = handlerMap.get(address);
HandlerHolder lastHolder = null;
if (handlers != null) {
synchronized (handlers) {
int size = handlers.list.size();
// Requires a list traversal. This is tricky to optimise since we can't use a set since
// we need fast ordered traversal for the round robin
for (int i = 0; i < size; i++) {
HandlerHolder holder = handlers.list.get(i);
if (holder.getHandler() == handler) {
handlers.list.remove(i);
holder.setRemoved();
if (handlers.list.isEmpty()) {
handlerMap.remove(address);
lastHolder = holder;
}
holder.getContext().removeCloseHook(new HandlerEntry<>(address, holder.getHandler()));
break;
}
}
}
}
return lastHolder;
}

其真正的unregister逻辑位于removeLocalRegistration方法中。首先需要从handlerMap中获取地址对应的Handlers实例handlers,如果handlers不为空,为了防止并发问题,Vert.x需要对其加锁后再进行操作。Vert.x需要遍历handlers中的列表,遇到与传入的HandlerRegistration相匹配的HandlerHolder就将其从列表中移除,然后调用对应holdersetRemoved方法标记其为已注销并记录Metrics数据(handlerUnregistered)。如果移除此HandlerHolderhandlers没有任何注册的Handler了,就将该地址对应的Handlers实例从handlerMap中移除并保存刚刚移除的HandlerHolder。另外,由于已经将此consumer注销,在undeploy verticle的时候不需要再进行unregister,因此这里还要将之前注册到context的hook移除。

调用完removeLocalRegistration方法以后,Vert.x会调用另一个版本的removeRegistration方法,调用completionHandler(用户在第二个版本的unregister方法中传入的处理函数)对应的逻辑,其它的参数都没什么用。。。

这就是MessageConsumer注销的逻辑实现。下面就到了本文的另一重头戏了 —— 发送消息相关的函数sendpublish

send & publish

sendpublish的逻辑相近,只不过一个是发送至目标地址的某一消费者,一个是发布至目标地址的所有消费者。Vert.x使用一个标志位send来代表是否为点对点发送模式。

几个版本的sendpublish最终都归结于生成消息对象然后调用sendOrPubInternal方法执行逻辑,只不过send标志位不同:

1
2
3
4
5
6
7
8
9
10
11
@Override
public <T> EventBus send(String address, Object message, DeliveryOptions options, Handler<AsyncResult<Message<T>>> replyHandler) {
sendOrPubInternal(createMessage(true, address, options.getHeaders(), message, options.getCodecName()), options, replyHandler);
return this;
}
@Override
public EventBus publish(String address, Object message, DeliveryOptions options) {
sendOrPubInternal(createMessage(false, address, options.getHeaders(), message, options.getCodecName()), options, null);
return this;
}

两个方法中都是通过createMessage方法来生成对应的消息对象的:

1
2
3
4
5
6
7
protected MessageImpl createMessage(boolean send, String address, MultiMap headers, Object body, String codecName) {
Objects.requireNonNull(address, "no null address accepted");
MessageCodec codec = codecManager.lookupCodec(body, codecName);
@SuppressWarnings("unchecked")
MessageImpl msg = new MessageImpl(address, null, headers, body, codec, send, this);
return msg;
}

createMessage方法接受5个参数:send即上面提到的标志位,address为发送目标地址,headers为设置的header,body代表发送的对象,codecName代表对应的Codec(消息编码解码器)名称。createMessage方法首先会确保地址不为空,然后通过codecManager来获取对应的MessageCodec。如果没有提供Codec(即codecName为空),那么codecManager会根据发送对象body的类型来提供内置的Codec实现(具体逻辑请见此处)。准备好MessageCodec后,createMessage方法就会创建一个MessageImpl实例并且返回。

这里我们还需要了解一下MessageImpl的构造函数:

1
2
3
4
5
6
7
8
9
10
11
public MessageImpl(String address, String replyAddress, MultiMap headers, U sentBody,
MessageCodec<U, V> messageCodec,
boolean send, EventBusImpl bus) {
this.messageCodec = messageCodec; // Codec
this.address = address; // 发送目标地址
this.replyAddress = replyAddress; // 回复地址
this.headers = headers; // header
this.sentBody = sentBody; // 发送的对象
this.send = send; // 是否为点对点模式
this.bus = bus; // 相关的Event Bus实例
}

createMessage方法并没有设置回复地址replyAddress。如果用户指定了replyHandler的话,后边sendOrPubInternal方法会对此消息实体进行加工,设置replyAddress并生成回复逻辑对应的HandlerRegistration

我们看一下sendOrPubInternal方法的源码:

1
2
3
4
5
6
7
private <T> void sendOrPubInternal(MessageImpl message, DeliveryOptions options,
Handler<AsyncResult<Message<T>>> replyHandler) {
checkStarted();
HandlerRegistration<T> replyHandlerRegistration = createReplyHandlerRegistration(message, options, replyHandler);
SendContextImpl<T> sendContext = new SendContextImpl<>(message, options, replyHandlerRegistration);
sendContext.next();
}

它接受三个参数:要发送的消息message,发送配置选项options以及回复处理函数replyHandler。首先sendOrPubInternal方法要检查Event Bus是否已启动,接着如果绑定了回复处理函数,Vert.x就会调用createReplyHandlerRegistration方法给消息实体message包装上回复地址,并且生成对应的reply consumer。接着Vert.x创建了一个包装消息的SendContextImpl实例并调用了其next方法。

我们一步一步来解释。首先是createReplyHandlerRegistration方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private <T> HandlerRegistration<T> createReplyHandlerRegistration(MessageImpl message, DeliveryOptions options, Handler<AsyncResult<Message<T>>> replyHandler) {
if (replyHandler != null) {
long timeout = options.getSendTimeout();
String replyAddress = generateReplyAddress();
message.setReplyAddress(replyAddress);
Handler<Message<T>> simpleReplyHandler = convertHandler(replyHandler);
HandlerRegistration<T> registration =
new HandlerRegistration<>(vertx, metrics, this, replyAddress, message.address, true, replyHandler, timeout);
registration.handler(simpleReplyHandler);
return registration;
} else {
return null;
}
}

createReplyHandlerRegistration方法首先检查传入的replyHandler是否为空(是否绑定了replyHandler,回复处理函数),如果为空则代表不需要处理回复,直接返回null;若replyHandler不为空,createReplyHandlerRegistration方法就会从配置中获取reply的最大超时时长(默认30s),然后调用generateReplyAddress方法生成对应的回复地址replyAddress

1
2
3
4
5
private final AtomicLong replySequence = new AtomicLong(0);
protected String generateReplyAddress() {
return Long.toString(replySequence.incrementAndGet());
}

生成回复地址的逻辑有点简单。。。。EventBusImpl实例中维护着一个AtomicLong类型的replySequence成员变量代表对应的回复地址。每次生成的时候都会使其自增,然后转化为String。也就是说生成的replyAddress都类似于”1”、”5”这样,而不是我们想象中的直接回复至sender的地址。。。

生成完毕以后,createReplyHandlerRegistration方法会将生成的replyAddress设定到消息对象message中。接着Vert.x会通过convertHandler方法对replyHandler进行包装处理并生成类型简化为Handler<Message<T>>simpleReplyHandler,它用于绑定至后面创建的reply consumer上。接着Vert.x会创建对应的reply consumer。关于reply操作的实现,我们后边会详细讲述。下面Vert.x就通过handler方法将生成的回复处理函数simpleReplyHandler绑定至创建好的reply consumer中,其底层实现我们之前已经分析过了,这里就不再赘述。最后此方法返回生成的registration,即对应的reply consumer。注意这个reply consumer是一次性的,也就是说Vert.x会在其接收到回复或超时的时候自动对其进行注销。

OK,现在回到sendOrPubInternal方法中来。下面Vert.x会创建一个SendContextImpl实例并调用其next方法。SendContextImpl类实现了SendContext接口,它相当于一个消息的封装体,并且可以与Event Bus中的interceptors(拦截器)结合使用。

SendContext接口定义了三个方法:

  • message: 获取当前SendContext包装的消息实体
  • next: 调用下一个消息拦截器
  • send: 代表消息的发送模式是否为点对点模式

在Event Bus中,消息拦截器本质上是一个Handler<SendContext>类型的处理函数。Event Bus内部存储着一个interceptors列表用于存储绑定的消息拦截器。我们可以通过addInterceptorremoveInterceptor方法进行消息拦截器的添加和删除操作。如果要进行链式拦截,则在每个拦截器中都应该调用对应SendContextnext方法,比如:

1
2
3
4
eventBus.addInterceptor(sc -> {
// 一些处理逻辑
sc.next(); // 调用下一个拦截器
});

我们来看一下SendContextImpl类中next方法的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
protected class SendContextImpl<T> implements SendContext<T> {
public final MessageImpl message;
public final DeliveryOptions options;
public final HandlerRegistration<T> handlerRegistration;
public final Iterator<Handler<SendContext>> iter;
public SendContextImpl(MessageImpl message, DeliveryOptions options, HandlerRegistration<T> handlerRegistration) {
this.message = message;
this.options = options;
this.handlerRegistration = handlerRegistration;
this.iter = interceptors.iterator();
}
@Override
public Message<T> message() {
return message;
}
@Override
public void next() {
if (iter.hasNext()) {
Handler<SendContext> handler = iter.next();
try {
handler.handle(this);
} catch (Throwable t) {
log.error("Failure in interceptor", t);
}
} else {
sendOrPub(this);
}
}
@Override
public boolean send() {
return message.send();
}
}

我们可以看到,SendContextImpl类中维护了一个拦截器列表对应的迭代器。每次调用next方法时,如果迭代器中存在拦截器,就将下个拦截器取出并进行相关调用。如果迭代器为空,则代表拦截器都已经调用完毕,Vert.x就会调用EventBusImpl类下的sendOrPub方法进行消息的发送操作。

sendOrPub方法仅仅在metrics模块中记录相关数据(messageSent),最后调用deliverMessageLocally(SendContextImpl<T>)方法执行消息的发送逻辑:

1
2
3
4
5
6
7
8
9
10
protected <T> void deliverMessageLocally(SendContextImpl<T> sendContext) {
if (!deliverMessageLocally(sendContext.message)) {
// no handlers
metrics.replyFailure(sendContext.message.address, ReplyFailure.NO_HANDLERS);
if (sendContext.handlerRegistration != null) {
sendContext.handlerRegistration.sendAsyncResultFailure(ReplyFailure.NO_HANDLERS, "No handlers for address "
+ sendContext.message.address);
}
}
}

这里面又套了一层。。。它最后其实是调用了deliverMessageLocally(MessageImpl)方法。此方法返回值代表发送消息的目标地址是否注册有MessageConsumer,如果没有(false)则记录错误并调用sendContext中保存的回复处理函数处理错误(如果绑定了replyHandler的话)。

deliverMessageLocally(MessageImpl)方法是真正区分sendpublish的地方,我们来看一下其实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
protected <T> boolean deliverMessageLocally(MessageImpl msg) {
msg.setBus(this);
Handlers handlers = handlerMap.get(msg.address());
if (handlers != null) {
if (msg.send()) {
//Choose one
HandlerHolder holder = handlers.choose();
metrics.messageReceived(msg.address(), !msg.send(), isMessageLocal(msg), holder != null ? 1 : 0);
if (holder != null) {
deliverToHandler(msg, holder);
}
} else {
// Publish
metrics.messageReceived(msg.address(), !msg.send(), isMessageLocal(msg), handlers.list.size());
for (HandlerHolder holder: handlers.list) {
deliverToHandler(msg, holder);
}
}
return true;
} else {
metrics.messageReceived(msg.address(), !msg.send(), isMessageLocal(msg), 0);
return false;
}
}

首先Vert.x需要从handlerMap中获取目标地址对应的处理函数集合handlers。接着,如果handlers不为空的话,Vert.x就会判断消息实体的send标志位。如果send标志位为true则代表以点对点模式发送,Vert.x就会通过handlerschoose方法(之前提到过),按照轮询算法来获取其中的某一个HandlerHolder。获取到HandlerHolder之后,Vert.x会通过deliverToHandler方法将消息分发至HandlerHolder中进行处理;如果send标志位为false则代表向所有消费者发布消息,Vert.x就会对handlers中的每一个HandlerHolder依次调用deliverToHandler方法,以便将消息分发至所有注册到此地址的Handler中进行处理。

消息处理的真正逻辑就在deliverToHandler方法中。我们来看一下它的源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private <T> void deliverToHandler(MessageImpl msg, HandlerHolder<T> holder) {
// Each handler gets a fresh copy
@SuppressWarnings("unchecked")
Message<T> copied = msg.copyBeforeReceive();
if (metrics.isEnabled()) {
metrics.scheduleMessage(holder.getHandler().getMetric(), msg.isLocal());
}
holder.getContext().runOnContext((v) -> {
// Need to check handler is still there - the handler might have been removed after the message were sent but
// before it was received
try {
if (!holder.isRemoved()) {
holder.getHandler().handle(copied);
}
} finally {
if (holder.isReplyHandler()) {
holder.getHandler().unregister();
}
}
});
}

首先deliverToHandler方法会复制一份要发送的消息,然后deliverToHandler方法会调用metricsscheduleMessage方法记录对应的Metrics信息(计划对消息进行处理。此函数修复了Issue 1480)。接着deliverToHandler方法会从传入的HandlerHolder中获取对应的Vert.x Context,然后调用runOnContext方法以便可以让消息处理逻辑在Vert.x Context中执行。为防止对应的handler在处理之前被移除,这里还需要检查一下holderisRemoved属性。如果没有移除,那么就从holder中获取对应的handler并调用其handle方法执行消息的处理逻辑。注意这里获取的handler实际上是一个HandlerRegistration。前面提到过HandlerRegistration类同时实现了MessageConsumer接口和Handler接口,因此它兼具这两个接口所期望的功能。另外,之前我们提到过Vert.x会自动注销接收过回复的reply consumer,其逻辑就在这个finally块中。Vert.x会检查holder中的handler是否为reply handler(reply consumer),如果是的话就调用其unregister方法将其注销,来确保reply consumer为一次性的。

之前我们提到过MessageConsumer继承了ReadStream接口,因此HandlerRegistration需要实现flow control(back-pressure)的相关逻辑。那么如何实现呢?我们看到,HandlerRegistration类中有一个paused标志位代表是否还继续处理消息。ReadStream接口中定义了两个函数用于控制stream的通断:当处理速度小于读取速度(发生拥塞)的时候我们可以通过pause方法暂停消息的传递,将积压的消息暂存于内部的消息队列(缓冲区)pending中;当相对速度正常的时候,我们可以通过resume方法恢复消息的传递和处理。

我们看一下HandlerRegistrationhandle方法的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Override
public void handle(Message<T> message) {
Handler<Message<T>> theHandler;
synchronized (this) {
if (paused) {
if (pending.size() < maxBufferedMessages) {
pending.add(message);
} else {
if (discardHandler != null) {
discardHandler.handle(message);
} else {
log.warn("Discarding message as more than " + maxBufferedMessages + " buffered in paused consumer");
}
}
return;
} else {
if (pending.size() > 0) {
pending.add(message);
message = pending.poll();
}
theHandler = handler;
}
}
deliver(theHandler, message);
}

果然。。。handle方法在处理消息的基础上实现了拥塞控制的功能。为了防止资源争用,需要对自身进行加锁;首先handle方法会判断当前的consumer是否为paused状态,如果为paused状态,handle方法会检查当前缓冲区大小是否已经超过给定的最大缓冲区大小maxBufferedMessages,如果没超过,就将收到的消息push到缓冲区中;如果大于或等于阈值,Vert.x就需要丢弃超出的那部分消息。如果当前的consumer为正常状态,则如果缓冲区不为空,就将收到的消息push到缓冲区中并从缓冲区中pull队列首端的消息,然后调用deliver方法执行真正的消息处理逻辑。注意这里是在锁之外执行deliver方法的,这是为了保证在multithreaded worker context下可以并发传递消息(见Bug 473714 )。由于multithreaded worker context允许在不同线程并发执行逻辑(见官方文档),如果将deliver方法置于synchronized块之内,其他线程必须等待当前锁被释放才能进行消息的传递逻辑,因而不能做到“delivery concurrently”。

deliver方法是真正执行“消息处理”逻辑的地方:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private void deliver(Handler<Message<T>> theHandler, Message<T> message) {
checkNextTick();
boolean local = true;
if (message instanceof ClusteredMessage) {
// A bit hacky
ClusteredMessage cmsg = (ClusteredMessage)message;
if (cmsg.isFromWire()) {
local = false;
}
}
String creditsAddress = message.headers().get(MessageProducerImpl.CREDIT_ADDRESS_HEADER_NAME);
if (creditsAddress != null) {
eventBus.send(creditsAddress, 1);
}
try {
metrics.beginHandleMessage(metric, local);
theHandler.handle(message);
metrics.endHandleMessage(metric, null);
} catch (Exception e) {
log.error("Failed to handleMessage", e);
metrics.endHandleMessage(metric, e);
throw e;
}
}

首先Vert.x会调用checkNextTick方法来检查消息队列(缓冲区)中是否存在更多的消息等待被处理,如果有的话就取出队列首端的消息并调用deliver方法将其传递给handler进行处理。这里仍需要注意并发问题,相关实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private synchronized void checkNextTick() {
if (!pending.isEmpty()) {
handlerContext.runOnContext(v -> {
Message<T> message;
Handler<Message<T>> theHandler;
synchronized (HandlerRegistration.this) {
if (paused || (message = pending.poll()) == null) {
return;
}
theHandler = handler;
}
deliver(theHandler, message);
});
}
}

检查完消息队列以后,Vert.x会接着根据message判断消息是否仅在本地进行处理并给local标志位赋值,local标志位将在记录Metrics数据时用到。

接下来我们看到Vert.x从消息的headers中获取了一个地址creditsAddress,如果creditsAddress存在就向此地址发送一条消息,body为1。那么这个creditsAddress又是啥呢?其实,它与flow control有关,我们会在下面详细分析。发送完credit消息以后,接下来就到了调用handler处理消息的时刻了。在处理消息之前需要调用metricsbeginHandleMessage方法记录消息开始处理的metrics数据,在处理完消息以后需要调用endHandleMessage方法记录消息处理结束的metrics数据。

嗯。。。到此为止,消息的发送和处理过程就已经一目了然了。下面我们讲一讲之前代码中出现的creditsAddress到底是啥玩意~

MessageProducer

之前我们提到过,Vert.x定义了两个接口作为 flow control aware object 的规范:WriteStream以及ReadStream。对于ReadStream我们已经不再陌生了,MessageConsumer就继承了它;那么大家应该可以想象到,有MessageConsumer就必有MessageProducer。不错,Vert.x中的MessageProducer接口对应某个address上的消息生产者,同时它继承了WriteStream接口,因此MessageProducer的实现类MessageProducerImpl同样具有flow control的能力。我们可以把MessageProducer看做是一个具有flow control功能的增强版的EventBus。我们可以通过EventBus接口的publisher方法创建一个MessageProducer

MessageProducer有了初步了解之后,我们就可以解释前面deliver方法中的creditsAddress了。MessageProducer接口的实现类 —— MessageProducerImpl类的流量控制功能是基于credit的,其内部会维护一个credit值代表“发送消息的能力”,其默认值等于DEFAULT_WRITE_QUEUE_MAX_SIZE

1
2
private int maxSize = DEFAULT_WRITE_QUEUE_MAX_SIZE;
private int credits = DEFAULT_WRITE_QUEUE_MAX_SIZE;

在采用点对点模式发送消息的时候,MessageProducer底层会调用doSend方法进行消息的发送。发送依然利用Event Bus的send方法,只不过doSend方法中添加了flow control的相关逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
private synchronized <R> void doSend(T data, Handler<AsyncResult<Message<R>>> replyHandler) {
if (credits > 0) {
credits--;
if (replyHandler == null) {
bus.send(address, data, options);
} else {
bus.send(address, data, options, replyHandler);
}
} else {
pending.add(data);
}
}

MessageConsumer类似,MessageProducer内部同样保存着一个消息队列(缓冲区)用于暂存堆积的消息。当credits大于0的时候代表可以发送消息(没有出现拥塞),Vert.x就会调用Event Bus的send方法进行消息的发送,同时credits要减1;如果credits小于等于0,则代表此时消息发送的速度太快,出现了拥塞,需要暂缓发送,因此将要发送的对象暂存于缓冲区中。大家可能会问,credits值不断减小,那么恢复消息发送能力(增大credits)的逻辑在哪呢?这就要提到creditsAddress了。我们看一下MessageProducerImpl类的构造函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public MessageProducerImpl(Vertx vertx, String address, boolean send, DeliveryOptions options) {
this.vertx = vertx;
this.bus = vertx.eventBus();
this.address = address;
this.send = send;
this.options = options;
if (send) {
String creditAddress = UUID.randomUUID().toString() + "-credit";
creditConsumer = bus.consumer(creditAddress, msg -> {
doReceiveCredit(msg.body());
});
options.addHeader(CREDIT_ADDRESS_HEADER_NAME, creditAddress);
} else {
creditConsumer = null;
}
}

MessageProducerImpl的构造函数中生成了一个creditAddress,然后给该地址绑定了一个Handler,当收到消息时调用doReceiveCredit方法执行解除拥塞,恢复消息发送的逻辑。MessageProducerImpl会将此MessageConsumer保存,以便在关闭消息生产者流的时候将其注销。接着构造函数会往optionsheaders中添加一条记录,保存对应的creditAddress,这也就是上面我们在deliver函数中获取的creditAddress

1
2
3
4
5
// 位于HandlerRegistration类的deliver函数中
String creditsAddress = message.headers().get(MessageProducerImpl.CREDIT_ADDRESS_HEADER_NAME);
if (creditsAddress != null) {
eventBus.send(creditsAddress, 1);
}

这样,发送消息到creditsAddress的逻辑也就好理解了。由于deliver函数的逻辑就是处理消息,因此这里向creditsAddress发送一个 1 其实就是将对应的credits值加1。恢复消息发送的逻辑位于MessageProducerImpl类的doReceiveCredit方法中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private synchronized void doReceiveCredit(int credit) {
credits += credit;
while (credits > 0) {
T data = pending.poll();
if (data == null) {
break;
} else {
credits--;
bus.send(address, data, options);
}
}
final Handler<Void> theDrainHandler = drainHandler;
if (theDrainHandler != null && credits >= maxSize / 2) {
this.drainHandler = null;
vertx.runOnContext(v -> theDrainHandler.handle(null));
}
}

逻辑一目了然。首先给credits加上发送过来的值(正常情况下为1),然后恢复发送能力,将缓冲区的数据依次取出、发送然后减小credits。同时如果MessageProducer绑定了drainHandler(消息流不拥塞的时候调用的逻辑,详见官方文档),并且MessageProducer发送的消息不再拥塞(credits >= maxSize / 2),那么就在Vert.x Context中执行drainHandler中的逻辑。

怎么样,体会到Vert.x中flow control的强大之处了吧!官方文档中MessageProducer的篇幅几乎没有,只在介绍WriteStream的时候提了提,因此这部分也可以作为MessageProducer的参考。

reply

最后就是消息的回复逻辑 —— reply方法了。reply方法的实现位于MessageImpl类中,最终调用的是reply(Object, DeliveryOptions, Handler<AsyncResult<Message<R>>>)这个版本:

1
2
3
4
5
6
@Override
public <R> void reply(Object message, DeliveryOptions options, Handler<AsyncResult<Message<R>>> replyHandler) {
if (replyAddress != null) {
sendReply(bus.createMessage(true, replyAddress, options.getHeaders(), message, options.getCodecName()), options, replyHandler);
}
}

这里reply方法同样调用EventBuscreateMessage方法创建要回复的消息实体,传入的replyAddress即为之前讲过的生成的非常简单的回复地址。然后再将消息实体、配置以及对应的replyHandler(如果有的话)传入sendReply方法进行消息的回复。最后其实是调用了Event Bus中的四个参数的sendReply方法,它的逻辑与之前讲过的sendOrPubInternal非常相似:

1
2
3
4
5
6
7
8
9
protected <T> void sendReply(MessageImpl replyMessage, MessageImpl replierMessage, DeliveryOptions options,
Handler<AsyncResult<Message<T>>> replyHandler) {
if (replyMessage.address() == null) {
throw new IllegalStateException("address not specified");
} else {
HandlerRegistration<T> replyHandlerRegistration = createReplyHandlerRegistration(replyMessage, options, replyHandler);
new ReplySendContextImpl<>(replyMessage, options, replyHandlerRegistration, replierMessage).next();
}
}

参数中replyMessage代表回复消息实体,replierMessage则代表回复者自身的消息实体(sender)。

如果地址为空则抛出异常;如果地址不为空,则先调用createReplyHandlerRegistration方法创建对应的replyHandlerRegistrationcreateReplyHandlerRegistration方法的实现之前已经讲过了。注意这里的createReplyHandlerRegistration其实对应的是此replier的回复,因为Vert.x中的 Request-Response 消息模型不限制相互回复(通信)的次数。当然如果没有指定此replier的回复的replyHandler,那么此处的replyHandlerRegistration就为空。最后sendReply方法会创建一个ReplySendContextImpl并调用其next方法。

ReplySendContextImpl类同样是SendContext接口的一个实现(继承了SendContextImpl类)。ReplySendContextImpl比起其父类就多保存了一个replierMessagenext方法的逻辑与父类逻辑非常相似,只不过将回复的逻辑替换成了另一个版本的sendReply方法:

1
2
3
4
5
6
7
8
9
@Override
public void next() {
if (iter.hasNext()) {
Handler<SendContext> handler = iter.next();
handler.handle(this);
} else {
sendReply(this, replierMessage);
}
}

然而。。。sendReply方法并没有用到传入的replierMessage,所以这里最终还是调用了sendOrPub方法(尼玛,封装的ReplySendContextImpl貌似并没有什么卵用,可能为以后的扩展考虑?)。。。之后的逻辑我们都已经分析过了。

这里再强调一点。当我们发送消息同时指定replyHandler的时候,其内部为reply创建的reply consumer(类型为HandlerRegistration)指定了timeout。这个定时器从HandlerRegistration创建的时候就开始计时了。我们回顾一下:

1
2
3
4
5
6
if (timeout != -1) {
timeoutID = vertx.setTimer(timeout, tid -> {
metrics.replyFailure(address, ReplyFailure.TIMEOUT);
sendAsyncResultFailure(ReplyFailure.TIMEOUT, "Timed out after waiting " + timeout + "(ms) for a reply. address: " + address);
});
}

计时器会在超时的时候记录错误并强制注销当前consumer。由于reply consumer是一次性的,当收到reply的时候,Vert.x会自动对reply consumer调用unregister方法对其进行注销(实现位于EventBusImpl#deliverToHandler方法中),而在注销逻辑中会关闭定时器(参见前面对doUnregister方法的解析);如果超时,那么计时器就会触发,Vert.x会调用sendAsyncResultFailure方法注销当前reply consumer并处理错误。

synchronized的性能问题

大家可能看到为了防止race condition,Vert.x底层大量使用了synchronized关键字(重量级锁)。这会不会影响性能呢?其实,如果开发者遵循Vert.x的线程模型和开发规范(使用Verticle)的话,有些地方的synchronized对应的锁会被优化为 偏向锁轻量级锁(因为通常都是同一个Event Loop线程获取对应的锁),这样性能总体开销不会太大。当然如果使用Multi-threaded worker verticles就要格外关注性能问题了。。。

总结

我们来简略地总结一下Event Bus的工作原理。当我们调用consumer绑定一个MessageConsumer时,Vert.x会将它保存至Event Bus实例内部的Map中;当我们通过sendpublish向对应的地址发送消息的时候,Vert.x会遍历Event Bus中存储consumer的Map,获取与地址相对应的consumer集合,然后根据相应的策略传递并处理消息(send通过轮询策略获取任意一个consumer并将消息传递至consumer中,publish则会将消息传递至所有注册到对应地址的consumer中)。同时,MessageConsumerMessageProducer这两个接口的实现都具有flow control功能,因此它们也可以用在Pump中。

Event Bus是Vert.x中最为重要的一部分之一,探索Event Bus的源码可以加深我们对Event Bus工作原理的理解。作为开发者,只会使用框架是不够的,能够理解内部的实现原理和精华,并对其进行改进才是更为重要的。本篇文章分析的是Local模式下的Event Bus,下篇文章我们将来探索一下生产环境中更常用的 Clustered Event Bus 的实现原理,敬请期待!