call/cc总结 | Scheme

Continuation

Continuation也是一个老生常谈的东西了,我们来回顾一下。首先我们看一下 TSPL4 中定义的表达式求值需要做的事:

During the evaluation of a Scheme expression, the implementation must keep track of two things: (1) what to evaluate and (2) what to do with the value.

Continuation即为其中的(2),即表达式被求值以后,接下来要对表达式做的计算R5RS 中continuation的定义为:

The continuation represents an entire (default) future for the computation.

比如 (+ (* 2 3) (+ 1 7)) 表达式中,(* 2 3)的continuation为:保存(* 2 3)计算出的值6,然后计算(+ 1 7)的值,最后将两表达式的值相乘,结束;(+ 1 7)的continuation为:保存(+ 1 7)的值8,将其与前面计算出的6相乘,结束。

Scheme中的continuation是first-class的,也就是说它可以被当做参数进行传递和返回;并且Scheme中可以将continuation视为一个procedure,也就是说可以调用continuation执行后续的运算。

call/cc

每个表达式在求值的时候,都会有一个对应的 current continuation,它在等着当前表达式求值完毕然后把值传递给它。那么如何捕捉current continuation呢?这就要用到Scheme中强大的call/cc了。call/cc的全名是call-with-current-continuation,它可以捕捉当前环境下的current continuation并利用它做各种各样的事情,如改变控制流,实现非本地退出(non-local exit)、协程(coroutine)、多任务(multi-tasking)等,非常方便。注意这里的continuation将当前context一并打包保存起来了,而不只是保存程序运行的位置。下面我们来举几个例子说明一下call/cc的用法。

current continuation

我们先来看个最简单的例子 —— 用它来捕捉current continuation并作为procedure调用。call/cc接受一个函数,该函数接受一个参数,此参数即为current continuation。以之前(+ (* 2 3) (+ 1 7)) 表达式中 (* 2 3)的continuation为例:

1
2
3
4
5
(define cc #f)
(+ (call/cc (lambda (return)
(set! cc return)
(* 2 3)))
(+ 1 7))

我们将(* 2 3)的current continuation(用(+ ? (+ 1 7))表示)绑定给cc变量。现在cc就对应了一个continuation,它相当于过程(define (cc x) (+ (x) (+ 1 7))),等待一个值然后进行后续的运算:

1
2
3
4
5
6
> cc
#<continuation>
> (cc 10)
18
> (cc (* 2 3))
14

这个例子很好理解,我们下面引入call/cc的本质 —— 控制流变换。在Scheme中,假设call/cc捕捉到的current continuation为cc(位于lambda中),如果cc作为过程直接或间接地被调用(即给它传值),call/cc会立即返回,返回值即为传入cc的值。即一旦current continuation被调用,控制流会跳到call/cc处。因此,利用call/cc,我们可以摆脱顺序执行的限制,在程序中跳来跳去,非常灵活。下面我们举几个non-local exit的例子来说明。

Non-local exit

Scheme中没有breakreturn关键字,因此在循环中如果想break并提前返回的话就得借助call/cc。比如下面的例子寻找传入的list中是否包含5

1
2
3
4
5
6
7
8
9
10
11
12
(define (do-with element return)
(if (= element 5)
(return 'find-five)
(void)))
(define (check-lst lst)
(call/cc (lambda (return)
(for-each (lambda (element)
(do-with element return)
(printf "~a~%" element))
lst)
'not-found)))

测试:

1
2
3
4
5
6
7
8
9
> (check-lst '(0 2 4))
0
2
4
'not-found
> (check-lst '(0 3 5 1))
0
3
'find-five

check-lst过程会遍历列表中的元素,每次都会将current continuation传给do-with过程并进行调用,一旦do-with遇到5,我们就将结果传给current continuation(即return),此时控制流会马上跳回check-lst过程中的call/cc处,这时候就已经终止遍历了(跳出了循环)。call/cc的返回值为'find-five,所以最后会在控制台上打印出'find-five

我们再来看一个经典的generator的例子,它非常像Python和ES6中的yield,每次调用的时候都会返回list中的一个元素:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
(define (generate-one-element-at-a-time lst)
;; Hand the next item from a-list to "return" or an end-of-list marker
(define (control-state return)
(for-each
(lambda (element)
(set! return (call/cc
(lambda (resume-here)
;; Grab the current continuation
(set! control-state resume-here)
(return element)))))
lst)
(return 'you-fell-off-the-end))
;; This is the actual generator, producing one item from a-list at a time
(define (generator)
(call/cc control-state))
;; Return the generator
generator)
(define generate-digit
(generate-one-element-at-a-time '(0 1 2)))

调用:

1
2
3
4
5
6
7
8
> (generate-digit)
0
> (generate-digit)
1
> (generate-digit)
2
> (generate-digit)
'you-fell-off-the-end

注意到这个例子里有两个call/cc,大家刚看到的时候可能会有点晕,其实这两个call/cc各司其职,互不干扰。第一个call/cc负责保存遍历的状态(从此处恢复),而generator中的call/cc才是真正生成值的地方(非本地退出)。其中一个需要注意的地方就是control-state,它在第一次调用的时候还是个procedure,在第一次调用的过程中它就被重新绑定成一个continuation,之后再调用generator生成器的时候,控制流就可以跳到之前遍历的位置继续执行下面的过程,从而达到生成器的效果。

阴阳谜题

continuation环境嵌套。后面有时间专开一篇分析~

1
2
3
4
5
(let* ((yin
((lambda (cc) (display #\@) cc) (call-with-current-continuation (lambda (c) c))))
(yang
((lambda (cc) (display #\*) cc) (call-with-current-continuation (lambda (c) c)))))
(yin yang))

call/cc与数理逻辑

这里简单提一下call/cc与类型系统和数理逻辑的联系。call/cc的类型是((P → Q) → P) → P,通过Curry-Howard 同构,它可以对应到经典逻辑中的Peirce’s law

$$((P \to Q) \to P) \to P$$

Peirce’s law代表排中律 $P \land \lnot P$,这条逻辑无法在Lambda演算所对应的直觉逻辑中表示(直觉逻辑中双重否定不成立),因此call/cc无法用Lambda表达式定义。通常我们用扩展后的 $\lambda \mu \ calculus$ 来定义call/cc,$\lambda \mu \ calculus$ 经Curry-Howard 同构可以得到经典逻辑。


References

分布式计算系统中的GC问题 | Yak(OSDI 2016) 学习笔记

最近一直在关注OSDI 2016,发现了这篇关于分布式计算系统内存管理的论文:Yak: A High-Performance Big-Data-Friendly. Garbage Collector,感觉比较有趣,拿来总结总结~

Yak是一个JVM平台上的针对大数据场景(分布式计算框架)设计、优化的Garbage Collector。

背景

在传统的基于分代模型的垃圾回收算法中,小对象首先被划分到Young Generation(新生代);如果对象经历过一定阈值的GC还会存活后,它就会被晋升至Old Generation(老年代);大对象可以直接进入老年代。

对于分布式计算框架而言,这样的模型有一定的弊端:没有考虑分布式计算情景下对象的生命周期。分布式计算中控制过程(Control Path)与数据处理过程(Data Path)界限明显,如果全都用统一的GC模型的话,会导致频繁请求GC数据,扫描全堆,最后实际回收的很少,导致Full GC(STW)。因此,像Apache Spark在1.5版本以后已经放弃使用JVM的GC进行内存管理,而是直接利用unsafe包进行内存管理。这样十分麻烦还容易出错。

Yak就是为了解决这样的问题而诞生的。既然分布式计算过程中控制过程与数据处理过程界限明显,Yak针对这两种过程中的数据划分了两种不同的空间(space):

  • Control Space (CS)
  • Data Space (DS)

对于控制过程(比如任务的调度、日志记录等),其内存布局与传统的一致,GC还是采用分代模型,分YoungGen/OldGen/Metaspace。控制过程产生的对象小、生命周期短暂,符合分代假设。

而对于数据处理过程,其中的对象通常都是很大的、在计算周期中一直需要访问的,因此Yak提出了Epoch Region,数据对象的生命周期依赖于每个epoch。每个epoch的start与end需要用户来设置(但是很简单)。

时域抽象

Epoch hypothesis: many data-path objects have the same life span and can be reclaimed together at the end of an epoch.

时域抽象(Epoch Region): 抽象成semilattice(半格),用于描述nested epoches之间的偏序关系。见论文Figure 5。(Order Theory在这里非常有用)

如何正确地回收某个特定的Region

如果有的对象生命周期超出此epoch,如何将其迁移至“安全地带”?

  • 思考点1:标记escaping objects
  • 思考点2:决定escaping objects的迁移终点并且执行复制

对于标记的过程,可以以cross-region/space references为根节点来遍历对象图并且标记其中的escaping objects(传递闭包)

对于决定其destination的过程,需要计算出对象O的引用region的上确界(via semilattice)。

如果对应的region具有继承关系,则应选择最上面的(上确界)。如果是不同线程执行的,那么对应的上确界则为CS。

TODO: 待详细总结

Distributed System | Spark RDD 论文总结

本篇文章是对Spark RDD论文的总结,中间会穿插一些Spark的内部实现总结,对应Spark版本为2.0。

RDD

Motivation

传统的分布式计算框架(如MapReduce)在执行计算任务时,中间结果通常会存于磁盘中,这样带来的IO消耗是非常大的,尤其是对于各种机器学习算法,它们需要复用上次计算的结果进行迭代,如果每次结果都存到磁盘上再从磁盘读取,耗时会很大。因此Spark这篇论文提出了一种新的分布式数据抽象 —— RDD

设计思想及特点

Resilient Distributed Dataset(RDD)是Apache Spark中数据的核心抽象,它是一种只读的、分区的数据记录集合。

RDD的特点:

  • Lazy evaluation,只在需要的时候才进行计算
  • RDD里面的数据是分区的,每一块数据都可能分布在集群内不同的节点上;支持并行计算
  • Resilient: 借助RDD lineage graph,Spark可以重新执行之前失败的计算任务而不用整体上重新计算,保证了容错性而且非常灵活,实现了fault-tolerance

那么如何操作、处理数据呢?Spark提供了一组函数式编程风格的API,可以很方便地对RDD进行操作、变换,就像操作集合一样。比如:

1
2
3
4
5
6
7
val rdd = sc.parallelize(1 to 100)
val result = rdd.map(_ + 10)
.filter(_ > 15)
.map(x => (x, 1))
.reduceByKey(_+_)
.collect

并且开发者可以根据需要自己编写相应的RDD以及RDD之间的操作,非常方便。可以这么理解,RDD就相当于抽象的数据表示,而operation就相当于一套DSL用于对RDD进行变换或者求值。

RDD的表示

Spark中的RDD主要包含五部分信息:

  • partitions(): partition集合
  • dependencies(): 当前RDD的dependency集合
  • iterator(split, context): 对每个partition进行计算或读取操作的函数
  • partitioner(): 分区方式,如HashPartitionerRangePartitioner
  • preferredLocations(split): 访问某个partition最快的节点

所有的RDD都继承抽象类RDD。几种常见的操作:

  • sc#textFile: 生成HadoopRDD,代表可以从HDFS中读取数据的RDD
  • sc#parallelize: 生成ParallelCollectionRDD,代表从Scala集合中生成的RDD
  • map, flatMap, filter: 生成MapPartitionsRDD,其partition与parent RDD一致,同时会对parent RDD中iterator函数返回的数据进行对应的操作(lazy)
  • union: 生成UnionRDDPartitionerAwareUnionRDD
  • reduceByKey, groupByKey: 生成ShuffledRDD,需要进行shuffle操作
  • cogroup, join: 生成CoGroupedRDD

Operations

Spark里面对RDD的操作分为两种:transformationaction

  • transformation是lazy的,仅仅会保存计算步骤并返回一个新的RDD,而不会立刻执行计算操作
  • action会依次执行计算操作并且得到结果

这些transformation和action在FP中应该是很常见的,如map, flatMap, filter, reduce, count, sum

对单个数据操作的transformation函数都在RDD抽象类内,而对tuple操作的transformation都在PairRDDFunctions包装类中。RDD可以通过implicit函数在符合类型要求的时候自动转换为PairRDDFunctions类,从而可以进行reduceByKey之类的操作。对应的implicit函数:

1
2
3
4
implicit def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)])
(implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null): PairRDDFunctions[K, V] = {
new PairRDDFunctions(rdd)
}

Dependency

上面我们提到,RDD只会在需要的时候计算结果,调用那些transformation方法以后,对应的transformation信息只是被简单地存储起来,直到调用某个action才会真正地去执行计算。Spark中RDD之间是有联系的,RDD之间会形成依赖关系,也就是形成lineage graph(依赖图)。Dependency大致分两种:narrow dependency和wide dependency。

  • Narrow dependency(NarrowDependency): Parent RDD中的每个partition最多被child RDD中的一个partition使用,即一对一的关系。比如map, flatMap, filter等transformation都是narrow dependency
  • Wide dependency(ShuffleDependency):Parent RDD中的每个partition会被child RDD中的多个partition使用,即一对多的关系。比如join生成的RDD一般是wide dependency(不同的partitioner)

论文中的图例很直观地表示了RDD间的依赖关系:

Spark RDD Dependency

这样划分dependency的原因:

  1. Narrow dependency可以方便地以流水线的形式执行计算,即从头到尾一串chain下来。而wide dependency必须要等所有的parent RDD的结果都准备好以后再执行计算
  2. Narrow dependency失败以后,Spark只需要重新计算失败的parent RDD即可;而对于wide dependency来说,一失败可能导致某些分区丢失,必须整体重新进行计算

Shuffle

Spark中的shuffle操作与MapReduce中类似,在计算wide dependency对应的RDD的时候(即ShuffleMapStage)会触发。

首先来回顾一下为什么要进行shuffle操作。以reduceByKey操作为例,Spark要按照key把这些具有相同key的tuple聚集到一块然后进行计算操作。然而这些tuple可能在不同的partition中,甚至在不同的集群节点中,要想计算必须先把它们聚集起来。因此,Spark用一组map task来将每个分区写入到临时文件中,然后下一个stage端(reduce task)会根据编号获取临时文件,然后将partition中的tuple按照key聚集起来并且进行相应的操作。这里面还包括着排序操作(可能在map side也可能在reduce side进行)。

Shuffle是Spark的主要性能瓶颈之一(涉及磁盘IO,数据序列化和网络IO),其优化一直是个难题。

  • Shuffle write(map task): SortShuffleWriter#write
  • Shuffle read(reduce task): ShuffleRDD#compute

Persistence

Checkpointing

Checkpoint的目的是保存那些计算耗时较长的RDD数据(long lineage chains),执行Checkpoint的时候会新提交一个Job,因此最好先persistcheckpoint

Cache/Persist

cachepersist用于缓存一些经常使用的RDD结果(但是不能太大)。

  • persist方法的主要作用是改变StorageLevel以在compute的时候通过BlockManager进行相应的持久化操作
  • cache方法相当于设置存储级别为MEMORY_ONLY

Job Scheduling

简单来说,Spark会将提交的计算划分为不同的stages,形成一个有向无环图(DAG)。Spark的调度器会按照DAG的次序依次进行计算每个stage,最终得到计算结果。执行计算的几个重要的类或接口如下:

  • DAGScheduler
  • ActiveJob
  • Stage
  • Task
  • TaskScheduler
  • SchedulerBackend

这里面最为重要的就是 DAGScheduler 了,它会将逻辑执行计划(即RDD lineage)转化为物理执行计划(stage/task)。之前我们提到过,当开发者对某个RDD执行action的时候,Spark才会执行真正的计算过程。当开发者执行action的时候,SparkContext会将当前的逻辑执行计划传给DAGSchedulerDAGScheduler会根据给定的逻辑执行计划生成一个Job(对应ActiveJob类)并提交。每执行一个acton都会生成一个ActiveJob

提交Job的过程中,DAGScheduler会进行stage的划分。Spark里是按照shuffle操作来划分stage的,也就是说stage之间都是wide dependency,每个stage之内的dependency都是narrow dependency。这样划分的好处是尽可能地把多个narrow dependency的RDD放到同一个stage之内以便于进行pipeline计算,而wide dependency中child RDD必须等待所有的parent RDD计算完成并且shuffle以后才能接着计算,因此这样划分stage是最合适的。

划分好的stages会形成一个DAG,DAGScheduler会根据DAG中的顺序先提交parent stages(如果存在的话),再提交当前stage,以此类推,最先提交的是没有parent stage的stage。从执行角度来讲,一个stage的parent stages执行完以后,该stage才可以被执行。最后一个stage是产生最终结果的stage,对应ResultStage,而其余的stage都是ShuffleMapStage。下面是论文中stage划分的一个图例,非常直观:

DAG of stages

提交stage的时候,Spark会根据stage的类型生成一组对应类型的Task(ResultTaskShuffleMapTask),然后将这些Task包装成TaskSet提交到TaskScheduler中。一个Task对应某个RDD中的某一个partition,即一个Task只负责某个partition的计算:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
val tasks: Seq[Task[_]] = try {
stage match {
case stage: ShuffleMapStage =>
partitionsToCompute.map { id =>
val locs = taskIdToLocations(id)
val part = stage.rdd.partitions(id)
new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
taskBinary, part, locs, stage.latestInfo.taskMetrics, properties)
}
case stage: ResultStage =>
val job = stage.activeJob.get
partitionsToCompute.map { id =>
val p: Int = stage.partitions(id)
val part = stage.rdd.partitions(p)
val locs = taskIdToLocations(id)
new ResultTask(stage.id, stage.latestInfo.attemptId,
taskBinary, part, locs, id, properties, stage.latestInfo.taskMetrics)
}
}
} catch {
// 此处代码略...
}

TaskScheduler会向执行任务的后端(SchedulerBackend,可以是Local, Mesos, Hadoop YARN或者其它集群管理组件)发送ReviveOffers消息,对应的执行后端接收到消息以后会将Task封装成TaskRunner(Runnable接口的实例),然后提交到底层的Executor中,并行执行计算任务。

Executor中的线程池定义如下:

1
private val threadPool = ThreadUtils.newDaemonCachedThreadPool("Executor task launch worker")
1
2
3
4
def newDaemonCachedThreadPool(prefix: String): ThreadPoolExecutor = {
val threadFactory = namedThreadFactory(prefix)
Executors.newCachedThreadPool(threadFactory).asInstanceOf[ThreadPoolExecutor]
}

可以看到底层执行task的线程池实际上是JUC中的CachedThreadPool,按需创建新线程,同时会复用线程池中已经建好的线程。

最后用一幅图总结一下Job, Stage和Task的关系(图来自 Mastering Apache Spark 2.0):

Stage, Job and Task in Spark

整个Spark Context执行task的步骤图:

Memory Management

Spark中RDD的存储方式有两种:in memory和on disk,默认是in memory的。进行分布式计算的时候通常会读入大量的数据,并且通常还需要重用这些数据,如果简单地把内存管理交给GC的话,很容易导致回收失败从而cause full GC,影响性能。

Spark 1.5开始不再通过GC管理内存。Spark 1.5实现了一个内存管理器用于手动管理内存(Project Tungsten),底层通过Unsafe类来直接分配和回收内存。

另外,分布式计算系统的GC方面还可以参考OSDI 2016的一篇论文: Yak: A High-Performance Big-Data-Friendly Garbage Collector

PageRank实例

下面在Spark中跑一个PageRank来观察一下生成的Stage DAG。PageRank的公式比较简单:

$$PageRank (p_i) = \frac{1-d}{N} + d \sum_{p_j \in M(p_i)} \frac{PageRank (p_j)}{L(p_j)} $$

这里我们选择damping factor=0.85,初始的rank值为1.0;PageRank算法可以用马尔科夫矩阵进行优化,但是这里迭代次数较小,可以直接进行迭代计算。对应代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
val iters = 10
val data = sc.textFile("data.txt")
val links = data.map { s =>
val parts = s.split("\\s+")
(parts(0), parts(1))
}.distinct().groupByKey().cache()
var ranks = links.mapValues(v => 1.0)
for (i <- 1 to iters) {
val contribs = links.join(ranks).values.flatMap { case (urls, rank) =>
val size = urls.size
urls.map(url => (url, rank / size))
}
ranks = contribs.reduceByKey(_ + _).mapValues(0.15 + 0.85 * _)
}
val output = ranks.collect()

对应的Stage DAG:

DAG of stages in PageRank Algorithm

其中Stage 3中的RDD dependencies如下:

One stage in PageRank Algorithm


References

Vert.x 技术内幕 | 解道Vert.x线程模型

线程模型概述

Vert.x的线程模型设计的非常巧妙。总的来说,Vert.x中主要有两种线程:Event Loop线程Worker线程。其中,Event Loop线程结合了Netty的EventLoop,用于处理事件。每一个EventLoop都与唯一的线程相绑定,这个线程就叫Event Loop线程。Event Loop线程不能被阻塞,否则事件将无法被处理。

Worker线程用于执行阻塞任务,这样既可以执行阻塞任务而又不阻塞Event Loop线程。

如果像Node.js一样只有单个Event Loop的话就不能充分利用多核CPU的性能了。为了充分利用多核CPU的性能,Vert.x中提供了一组Event Loop线程。每个Event Loop线程都可以处理事件。为了保证线程安全,防止资源争用,Vert.x保证了某一个Handler总是被同一个Event Loop线程执行,这样不仅可以保证线程安全,而且还可以在底层对锁进行优化提升性能。所以,只要开发者遵循Vert.x的线程模型,开发者就不需要再担心线程安全的问题,这是非常方便的。

本篇文章将底层的角度来解析Vert.x的线程模型。对应的Vert.x版本为3.3.3

Event Loop线程

首先回顾一下Event Loop线程,它会不断地轮询获取事件,并将获取到的事件分发到对应的事件处理器中进行处理:

Vert.x Event Loop

Vert.x线程模型中最重要的一点就是:永远不要阻塞Event Loop线程。因为一旦处理事件的线程被阻塞了,事件就会一直积压着不能被处理,整个应用也就不能正常工作了。

Vert.x中内置一种用于检测Event Loop是否阻塞的线程:vertx-blocked-thread-checker。一旦Event Loop处理某个事件的时间超过一定阈值(默认为2000ms)就会警告,如果阻塞的时间过长就会抛出异常。Block Checker的实现原理比较简单,底层借助了JUC的TimerTask,定时计算每个Event Loop线程的处理事件消耗的时间,如果超时就进行相应的警告。

Vert.x Thread

Vert.x中的Event Loop线程及Worker线程都用VertxThread类表示,并通过VertxThreadFactory线程工厂来创建。VertxThreadFactory创建Vert.x线程的过程非常简单:

1
2
3
4
5
6
7
8
9
10
11
12
@Override
public Thread newThread(Runnable runnable) {
VertxThread t = new VertxThread(runnable, prefix + threadCount.getAndIncrement(), worker, maxExecTime);
if (checker != null) {
checker.registerThread(t);
}
addToMap(t);
t.setDaemon(false);
return t;
}

除了创建VertxThread线程之外,VertxThreadFactory还会将此线程注册至Block Checker线程中以监视线程的阻塞情况,并且将此线程添加至内部的weakMap中。这个weakMap作用只有一个,就是在注销对应的Verticle的时候可以将每个VertxThread中的Context实例清除(unset)。为了保证资源不被一直占用,这里使用了WeakHashMap来存储每一个VertxThread。当里面的VertxThread的引用不被其他实例持有的时候,它就会被标记为可清除的对象,等待GC。

至于VertxThread,它其实就是在普通线程的基础上存储了额外的数据(如对应的Vert.x Context,最大执行时长,当前执行时间,是否为Worker线程等),这里就不多讲了。

Vert.x Context

Vert.x底层中一个重要的概念就是Context,每个Context都会绑定着一个Event Loop线程(而一个Event Loop线程可以对应多个Context)。我们可以把Context看作是控制一系列的Handler的执行作用域及顺序的上下文对象。

每当Vert.x底层将事件分发至Handler的时候,Vert.x都会给此Handler钦点一个Context用于处理任务:

  • 如果当前线程是Vert.x线程(VertxThread),那么Vert.x就会复用此线程上绑定的Context;如果没有对应的Context就创建新的
  • 如果当前线程是普通线程,就创建新的Context

Vert.x中存在三种Context,与之前的线程种类相对应:

  • EventLoopContext
  • WorkerContext
  • MultiThreadedWorkerContext

Event loop context

每个Event Loop Context都会对应着唯一的一个EventLoop,即一个Event Loop Context只会在同一个Event Loop线程上执行任务。在创建Context的时候,Vert.x会自动根据轮询策略选择对应的EventLoop:

1
2
3
4
5
6
7
8
9
10
11
protected ContextImpl(VertxInternal vertx, WorkerPool internalBlockingPool, WorkerPool workerPool, String deploymentID, JsonObject config,
ClassLoader tccl) {
// ...
EventLoopGroup group = vertx.getEventLoopGroup();
if (group != null) {
this.eventLoop = group.next();
} else {
this.eventLoop = null;
}
// ...
}

在Netty中,EventLoopGroup代表一组EventLoop,而从中获取EventLoop的方法则是next方法。EventLoopGroupEventLoop的数量由CPU内核数目所确定。Vert.x这里使用了Netty NIO对应的NioEventLoop

1
2
eventLoopGroup = new NioEventLoopGroup(options.getEventLoopPoolSize(), eventLoopThreadFactory);
eventLoopGroup.setIoRatio(NETTY_IO_RATIO);

对应的轮询算法:

1
2
3
4
5
6
7
8
@Override
public EventExecutorChooser newChooser(EventExecutor[] executors) {
if (isPowerOfTwo(executors.length)) {
return new PowerOfTowEventExecutorChooser(executors);
} else {
return new GenericEventExecutorChooser(executors);
}
}

可以看到,正常情况下Netty会用轮询策略选择EventLoop。特别地,如果EventLoop的个数是2的倍数的话,选择的会快一些:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private static final class GenericEventExecutorChooser implements EventExecutorChooser {
// ...
@Override
public EventExecutor next() {
return executors[Math.abs(idx.getAndIncrement() % executors.length)];
}
}
private static final class PowerOfTowEventExecutorChooser implements EventExecutorChooser {
// ...
@Override
public EventExecutor next() {
return executors[idx.getAndIncrement() & executors.length - 1];
}
}

我们可以在Embedded模式下测试一下Event Loop线程的分配:

1
2
3
4
5
6
7
System.out.println(Thread.currentThread());
Vertx vertx = Vertx.vertx();
for (int i = 0; i < 20; i++) {
int index = i;
vertx.setTimer(1, t -> {
System.out.println(index + ":" + Thread.currentThread());
});

运行结果(不同机器运行顺序、Event Loop线程数可能不同):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
Thread[main,5,main]
0:Thread[vert.x-eventloop-thread-0,5,main]
1:Thread[vert.x-eventloop-thread-1,5,main]
2:Thread[vert.x-eventloop-thread-2,5,main]
3:Thread[vert.x-eventloop-thread-3,5,main]
5:Thread[vert.x-eventloop-thread-5,5,main]
6:Thread[vert.x-eventloop-thread-6,5,main]
8:Thread[vert.x-eventloop-thread-8,5,main]
7:Thread[vert.x-eventloop-thread-7,5,main]
10:Thread[vert.x-eventloop-thread-10,5,main]
9:Thread[vert.x-eventloop-thread-9,5,main]
4:Thread[vert.x-eventloop-thread-4,5,main]
11:Thread[vert.x-eventloop-thread-11,5,main]
12:Thread[vert.x-eventloop-thread-12,5,main]
13:Thread[vert.x-eventloop-thread-13,5,main]
14:Thread[vert.x-eventloop-thread-14,5,main]
16:Thread[vert.x-eventloop-thread-0,5,main]
17:Thread[vert.x-eventloop-thread-1,5,main]
15:Thread[vert.x-eventloop-thread-15,5,main]
18:Thread[vert.x-eventloop-thread-2,5,main]
19:Thread[vert.x-eventloop-thread-3,5,main]

可以看到尽管每个Context对应唯一的Event Loop线程,而每个Event Loop线程却可能对应多个Context

Event Loop Context会在对应的EventLoop中执行Handler进行事件的处理(IO事件,非阻塞)。Vert.x会保证同一个Handler会一直在同一个Event Loop线程中执行,这样可以简化线程模型,让开发者在写Handler的时候不需要考虑并发的问题,非常方便。

我们来粗略地看一下Handler是如何在EventLoop上执行的。EventLoopContext中实现了executeAsync方法用于包装Handler中事件处理的逻辑并将其提交至对应的EventLoop中进行执行:

1
2
3
4
public void executeAsync(Handler<Void> task) {
// No metrics, we are on the event loop.
nettyEventLoop().execute(wrapTask(null, task, true, null));
}

这里Vert.x使用了wrapTask方法将Handler封装成了一个Runnable用于向EventLoop中提交。代码比较直观,大致就是检查当前线程是否为Vert.x线程,然后记录事件处理开始的时间,给当前的Vert.x线程设置Context,并且调用Handler里面的事件处理方法。具体请参考源码,这里就不贴出来了。

那么把封装好的task提交到EventLoop以后,EventLoop是怎么处理的呢?这就需要更多的Netty相关的知识了。根据Netty的模型,Event Loop线程需要处理IO事件,普通事件(即我们的Handler)以及定时事件(比如Vert.x的setTimer)。Vert.x会提供一个NETTY_IO_RATIO给Netty代表EventLoop处理IO事件时间占用的百分比(默认为50,即IO事件时间占用:非IO事件时间占用=1:1)。当EventLoop启动的时候,它会不断轮询IO时间及其它事件并进行处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
@Override
protected void run() {
for (;;) {
try {
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
default:
// fallthrough
}
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
processSelectedKeys();
runAllTasks();
} else {
final long ioStartTime = System.nanoTime();
processSelectedKeys();
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
break;
}
}
} catch (Throwable t) {
// process the error
// ...
}
}
}

这里面Netty会调用processSelectedKeys方法进行IO事件的处理,并且会计算出处理IO时间所用的事件然后计算出给非IO事件处理分配的时间,然后调用runAllTasks方法执行所有的非IO任务(这里面就有我们的各个Handler)。

runAllTasks会按顺序从内部的任务队列中取出任务(Runnable)然后进行安全执行。而我们刚才调用的NioEventLoopexecute方法其实就是将包装好的Handler置入NioEventLoop内部的任务队列中等待执行。

Worker context

顾名思义,Worker Context用于跑阻塞任务。与Event Loop Context相似,每一个Handler都只会跑在固定的Worker线程下。

Vert.x还提供一种Multi-threaded worker context可以在多个Worker线程下并发执行任务,这样就会出现并发问题,需要开发者自行解决并发问题。因此一般情况下我们用不到Multi-threaded worker context。

Verticle

我们再来讨论一下Verticle中的Context。在部署Verticle的时候,Vert.x会根据配置来创建Context并绑定到Verticle上,此后此Verticle上所有绑定的Handler都会在此Context上执行。相关实现位于doDeploy方法,这里摘取核心部分:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
for (Verticle verticle: verticles) {
WorkerExecutorImpl workerExec = poolName != null ? vertx.createSharedWorkerExecutor(poolName, options.getWorkerPoolSize()) : null;
WorkerPool pool = workerExec != null ? workerExec.getPool() : null;
// 根据配置创建Context
ContextImpl context = options.isWorker() ? vertx.createWorkerContext(options.isMultiThreaded(), deploymentID, pool, conf, tccl) :
vertx.createEventLoopContext(deploymentID, pool, conf, tccl);
if (workerExec != null) {
context.addCloseHook(workerExec);
}
context.setDeployment(deployment);
deployment.addVerticle(new VerticleHolder(verticle, context));
// 此Verticle上的Handler都会在创建的context作用域内执行
context.runOnContext(v -> {
try {
verticle.init(vertx, context);
Future<Void> startFuture = Future.future();
// 大家熟悉的start方法的执行点
verticle.start(startFuture);
startFuture.setHandler(ar -> {
if (ar.succeeded()) {
if (parent != null) {
parent.addChild(deployment);
deployment.child = true;
}
vertx.metricsSPI().verticleDeployed(verticle);
deployments.put(deploymentID, deployment);
if (deployCount.incrementAndGet() == verticles.length) {
reportSuccess(deploymentID, callingContext, completionHandler);
}
} else if (!failureReported.get()) {
reportFailure(ar.cause(), callingContext, completionHandler);
}
});
} catch (Throwable t) {
reportFailure(t, callingContext, completionHandler);
}
});
}

通过这样一种方式,Vert.x保证了Verticle的线程安全 —— 即某个Verticle上的所有Handler都会在同一个Vert.x线程上执行,这样也保证了Verticle内部成员的安全(没有race condition问题)。比如下面Verticle中处理IO及事件的处理都一直是在同一个Vert.x线程下执行的,每次打印出的线程名称应该是一样的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class TcpClientVerticle extends AbstractVerticle {
int i = 0;
@Override
public void start() throws Exception {
vertx.createNetClient().connect(6666, "localhost", ar -> {
if (ar.succeeded()) {
NetSocket socket = ar.result();
System.out.println(Thread.currentThread().getName());
socket.handler(buffer -> {
i++;
System.out.println(Thread.currentThread().getName());
System.out.println("Net client receiving: " + buffer.toString("UTF-8"));
});
socket.write("+1s\n");
} else {
ar.cause().printStackTrace();
}
});
}
}

线程池

Event Loop线程池

之前我们已经提到过,Event Loop线程池的类型为Netty中的NioEventLoopGroup,里面的线程通过Vert.x自己的线程工厂VertxThreadFactory进行创建:

1
2
3
eventLoopThreadFactory = new VertxThreadFactory("vert.x-eventloop-thread-", checker, false, options.getMaxEventLoopExecuteTime());
eventLoopGroup = new NioEventLoopGroup(options.getEventLoopPoolSize(), eventLoopThreadFactory);
eventLoopGroup.setIoRatio(NETTY_IO_RATIO);

其中Event Loop线程的数目可以在配置中指定。

Worker线程池

在之前讲executeBlocking底层实现的文章中我们已经提到过Worker线程池,它其实就是一种Fixed Thread Pool:

1
2
3
4
5
ExecutorService workerExec = Executors.newFixedThreadPool(options.getWorkerPoolSize(),
new VertxThreadFactory("vert.x-worker-thread-", checker, true, options.getMaxWorkerExecuteTime()));
PoolMetrics workerPoolMetrics = isMetricsEnabled() ? metrics.createMetrics(workerExec, "worker", "vert.x-worker-thread", options.getWorkerPoolSize()) : null;
workerPool = new WorkerPool(workerExec, workerPoolMetrics);

Worker线程同样由VertxThreadFactory构造,类型为VertxThread,用于执行阻塞任务。我们同样可以在配置中指定其数目。

内部阻塞线程池

1
2
3
4
ExecutorService internalBlockingExec = Executors.newFixedThreadPool(options.getInternalBlockingPoolSize(),
new VertxThreadFactory("vert.x-internal-blocking-", checker, true, options.getMaxWorkerExecuteTime()));
PoolMetrics internalBlockingPoolMetrics = isMetricsEnabled() ? metrics.createMetrics(internalBlockingExec, "worker", "vert.x-internal-blocking", options.getInternalBlockingPoolSize()) : null;
internalBlockingPool = new WorkerPool(internalBlockingExec, internalBlockingPoolMetrics);

Internal Blocking Pool可能设计用于内部使用,在executeBlocking(Action<T> action, Handler<AsyncResult<T>> resultHandler)这个版本的方法中就使用了它。

Acceptor Event Loop线程池

大家可能会发现VertxImpl类中还有一个acceptorEventLoopGroup。顾名思义,它是Netty中的Acceptor线程池,负责处理客户端的连接请求:

1
2
acceptorEventLoopGroup = new NioEventLoopGroup(1, acceptorEventLoopThreadFactory);
acceptorEventLoopGroup.setIoRatio(100);

由于系统只有一个服务端端口需要监听,因此这里只需要一个线程。

Vert.x中的HttpServer就利用了acceptorEventLoopGroup处理客户端的连接请求,具体的实现后边会另起一篇介绍。

Vert.x 技术内幕 | executeBlocking 实现原理

引入

大家都知道,Vert.x中的executeBlocking方法用于执行阻塞任务,并且有两种模式:有序执行和无序执行。下面我们来看两段代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
vertx.setPeriodic(1000, t -> {
vertx.executeBlocking(future -> {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName());
future.complete();
}, r -> {});
});
vertx.setPeriodic(1000, t -> {
vertx.executeBlocking(future -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName());
future.complete();
}, r -> {});
});

我们思考一下,每段代码每次执行的时候使用的线程相同么?正常情况下大家都知道executeBlocking底层使用了Worker线程池,因此貌似两种情况没什么区别,都是轮询Worker线程池,每次可能用不同的Worker线程。但是我们测一下:

第一段代码:

1
2
3
4
5
vert.x-worker-thread-0
vert.x-worker-thread-1
vert.x-worker-thread-2
vert.x-worker-thread-3
vert.x-worker-thread-4

第二段代码:

1
2
3
4
5
vert.x-worker-thread-0
vert.x-worker-thread-0
vert.x-worker-thread-0
vert.x-worker-thread-0
vert.x-worker-thread-0

额。。。两段代码每次执行的线程居然有差异?第二次为什么每次都用相同的Worker线程?其实,大家可能忽略了一点:executeBlocking方法默认顺序执行提交的阻塞任务。今天我们就来探究一下executeBlocking内部的实现。

Worker线程池

我们来回顾一下Vert.x底层的Worker线程池,它在创建VertxImpl实例的时候进行初始化:

1
2
3
4
5
ExecutorService workerExec = Executors.newFixedThreadPool(options.getWorkerPoolSize(),
new VertxThreadFactory("vert.x-worker-thread-", checker, true, options.getMaxWorkerExecuteTime()));
PoolMetrics workerPoolMetrics = isMetricsEnabled() ? metrics.createMetrics(workerExec, "worker", "vert.x-worker-thread", options.getWorkerPoolSize()) : null;
workerPool = new WorkerPool(workerExec, workerPoolMetrics);

可以看到底层的Worker线程池本质上是一种FixedThreadPool,里面的线程由VertxThreadFactory控制生成,对应的线程类型为VertxThread。Vert.x内部用WorkerPool类对线程池以及线程池相关的Metrics类进行了封装。

阻塞任务在哪里执行?

有了Worker线程池的基础,我们来看一下Vertx实例中的executeBlocking方法,它的过程很简单:获取当前的Vert.x Context(没有就创建),然后委托调用Context里的executeBlocking方法:

1
2
3
4
5
6
7
8
9
10
11
12
@Override
public <T> void executeBlocking(Handler<Future<T>> blockingCodeHandler, boolean ordered,
Handler<AsyncResult<T>> asyncResultHandler) {
ContextImpl context = getOrCreateContext();
context.executeBlocking(blockingCodeHandler, ordered, asyncResultHandler);
}
@Override
public <T> void executeBlocking(Handler<Future<T>> blockingCodeHandler,
Handler<AsyncResult<T>> asyncResultHandler) {
executeBlocking(blockingCodeHandler, true, asyncResultHandler);
}

在此方法中可以看到,ordered标志位默认为true,即默认按提交的次序执行阻塞任务。

我们再来看一下ContextImpl类中的executeBlocking方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
@Override
public <T> void executeBlocking(Handler<Future<T>> blockingCodeHandler, boolean ordered, Handler<AsyncResult<T>> resultHandler) {
executeBlocking(null, blockingCodeHandler, resultHandler, ordered ? workerExec : workerPool.executor(), workerPool.metrics());
}
<T> void executeBlocking(Action<T> action, Handler<Future<T>> blockingCodeHandler,
Handler<AsyncResult<T>> resultHandler,
Executor exec, PoolMetrics metrics) {
Object queueMetric = metrics != null ? metrics.submitted() : null;
try {
exec.execute(() -> {
Object execMetric = null;
if (metrics != null) {
execMetric = metrics.begin(queueMetric);
}
Future<T> res = Future.future();
try {
if (blockingCodeHandler != null) {
ContextImpl.setContext(this);
blockingCodeHandler.handle(res);
} else {
T result = action.perform();
res.complete(result);
}
} catch (Throwable e) {
res.fail(e);
}
if (metrics != null) {
metrics.end(execMetric, res.succeeded());
}
if (resultHandler != null) {
runOnContext(v -> res.setHandler(resultHandler));
}
});
} catch (RejectedExecutionException ignore) {
// Pool is already shut down
if (metrics != null) {
metrics.rejected(queueMetric);
}
}
}

它调用了另一个具体版本的executeBlocking方法,其中第四个参数即为要执行阻塞任务的线程池。如果要有序执行(ordered为true),底层就使用context实例里的workerExec线程池;如果无序执行,就调用workerPoolexecutor方法获取另一种线程池。

看到这里,我们大致已经想到了,有序执行和无序执行两种模式使用不同的线程池,因此底层实现肯定有差异。我们来看一下前面提到的两个线程池,它们都是ContextImpl类的成员变量:

1
2
protected final WorkerPool workerPool;
protected final Executor workerExec;

在通过Vertx实例创建Context的时候,这几个变量会被初始化,其来源就是之前我们看过的VertxImpl实例中的Worker线程池。看一下ContextImpl类的构造函数就一目了然了:

1
2
this.workerPool = workerPool;
this.workerExec = workerPool.createOrderedExecutor();

嗯。。。有序执行对应的线程池通过workerPoolcreateOrderedExecutor方法获得,而无序执行对应的线程池通过workerPoolexecutor方法获得。因此,WorkerPool类是一个关键点,我们稍后就看一下其实现。

注意Vert.x规定,blockingCodeHandler中的逻辑(即阻塞任务)在Worker线程内执行,而resultHandler内的逻辑(结果处理)需要在Vert.x Conext中执行,因此前面需要预先设置当前使用的Worker线程的Contextthis以便后面调用runOnContext方法执行结果处理逻辑。

下面就来看一下有序执行和无序执行这两种线程池的具体区别。

无序执行

我们看一下WorkerPool类的源码中获取无序执行线程池的逻辑:

1
2
3
ExecutorService executor() {
return pool;
}

可以看到executor方法直接返回了内部的pool线程池,而pool线程池其实就是VertxImpl中的workerExec线程池:

1
workerPool = new WorkerPool(workerExec, workerPoolMetrics);

OK!如果大家熟悉并发的话,大家应该对无序执行对应的线程池 —— Worker线程池的行为非常清楚了。它属于一种FixedThreadPool,底层通过阻塞队列LinkedBlockingQueue实现。底层通过轮询算法获取Worker线程执行任务。

有序执行

下面是时候看有序执行对应的逻辑了:

1
2
3
4
5
6
7
8
9
10
11
12
13
private final OrderedExecutorFactory orderedFact;
private final ExecutorService pool;
private final PoolMetrics metrics;
public WorkerPool(ExecutorService pool, PoolMetrics metrics) {
this.orderedFact = new OrderedExecutorFactory(pool);
this.pool = pool;
this.metrics = metrics;
}
Executor createOrderedExecutor() {
return orderedFact.getExecutor();
}

可以看到有序执行对应的线程池是通过OrderedExecutorFactory创建的。其实,OrderedExecutorFactory类会生成真正的有序执行线程池OrderedExecutor,它其实是对Worker线程池pool的一个简单包装,仅仅添加了有序执行相关的逻辑,最后还是委托Worker线程池进行任务处理。

那么OrderedExecutor是如何实现顺序执行的呢?OrderedExecutor内部维护着一个任务队列。每当调用executeBlocking方法执行阻塞过程的时候,Vert.x会将阻塞过程包装成Runnable然后置入OrderedExecutor中的任务队列中;同时如果OrderedExecutor没有开始执行任务,就委托内部的Worker线程池执行任务:

1
2
3
4
5
6
7
8
9
public void execute(Runnable command) {
synchronized (tasks) {
tasks.add(command);
if (!running) {
running = true;
parent.execute(runner);
}
}
}

从代码中可以看出,最后委托Worker线程池执行的线程其实是又包装了一层的runner线程。runner的逻辑不难想:不断地从任务队列中取出队首的Runnable然后调用其run方法执行(相当于执行了此任务,只不过在runner对应的线程中);如果没有任务了就结束本线程。

这里就出现了一种情况:大批量提交阻塞任务的时候,线程池的状态running一直为true,此时所有的任务都积压到任务队列中,而执行所有任务的线程只有一个 —— runner对应的线程。这种情况其实很好想,因为要保证有序执行,就只能让它一个接一个地在同个线程中执行。如果在不同线程中依次执行则不好调度,如果直接并行执行则不能保证有序性。

所以,根据OrderedExecutor线程池的内部实现,只要提交任务的间隔时间小于任务执行的时间,底层其实就仅执行了一次runner,也就是说所有提交的阻塞任务都只在一个线程下跑(running标志位控制)。

这样就可以很好地解释我们一开始提出的问题了。当sleep(200), setPeriodic(1000)的时候,提交任务的间隔时间大于任务执行的时间,这样每次的runner就可以在下一个任务提交之前执行完,因此每次所用的线程会不同(轮询策略);而sleep(2000), setPeriodic(1000)的时候,提交任务的间隔时间小于任务执行的时间,底层最后都归结到一个runner中执行了,因此所有过程都是在同一个Worker线程执行的(很好想,保证有序就要串行执行)。

当然,如果不想有序执行,可以用void executeBlocking(Handler<Future<T>> blockingCodeHandler, boolean ordered, Handler<AsyncResult<T>> asyncResultHandler)这个版本的executeBlocking方法,并将ordered标志位设为false。根据上面的源码,底层会直接使用Worker线程池而不是OrderedExecutor线程池,这样就不会有上面OrderedExecutor的情况了。

Vert.x 技术内幕 | 异步RPC实现原理

经常有一些开发者在group中问到,如何利用Vert.x进行RPC通信。其实,Vert.x提供了一个组件 —— Vert.x Service Proxy,专门用于进行异步RPC通信(通过Event Bus)。Vert.x Service Proxy会自动生成代理类进行消息的包装与解码、发送与接收以及超时处理,可以为我们省掉不少代码。之前我在Vert.x Blueprint中已经详细讲解了Vert.x Service Proxy的使用,大家可以参考 Vert.x Kue 文档 中的相关部分。本篇文章中我们将探索一下通过Vert.x Service Proxy生成的代理类进行异步RPC的原理,对应的Vert.x版本为3.3.2

传统的RPC想必大家都不陌生,但是传统的RPC有个缺陷:传统的RPC都是阻塞型的,当调用者远程调用服务时需要阻塞着等待调用结果,这与Vert.x的异步开发模式相违背;而且,传统的RPC未对容错而设计。

因此,Vert.x提供了Service Proxy用于进行异步RPC,其底层依托Clustered Event Bus进行通信。我们只需要按照规范编写我们的服务接口(一般称为Event Bus服务),并加上@ProxyGen注解,Vert.x就会自动为我们生成相应的代理类在底层处理RPC。有了Service Proxy,我们只需给异步方法提供一个回调函数Handler<AsyncResult<T>>,在调用结果发送过来的时候会自动调用绑定的回调函数进行相关的处理,这样就与Vert.x的异步开发模式相符了。由于AsyncResult本身就是为容错而设计的(两个状态),因此这里的RPC也具有了容错性。

原理简介

假设有一个Event Bus服务接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@ProxyGen
@VertxGen
public interface SomeService {
String SERVICE_ADDRESS = "service.example";
static SomeService createService(Vertx vertx, JsonObject config) {
return new SomeServiceImpl(vertx, config);
}
static SomeService createProxy(Vertx vertx) {
return ProxyHelper.createProxy(SomeService.class, vertx, SERVICE_ADDRESS);
}
@Fluent
SomeService process(String id, Handler<AsyncResult<JsonObject>> resultHandler);
}

这里定义了一个异步方法process,其异步调用返回的结果是AsyncResult<JsonObject>类型的。由于异步RPC底层通过Clustered Event Bus进行通信,我们需要给器指定一个通信地址SERVICE_ADDRESS@Fluent注解代表此方法返回自身,便于进行组合。我们同时还提供了两个辅助方法:createService方法用于创建服务实例,而createProxy方法则通过ProxyHelper辅助类创建服务代理实例。

假设服务提供端A注册了一个SomeService类型的服务代理,服务调用端B需要通过异步RPC调用服务的process方法,此时调用端B可以利用ProxyHelper获取服务实例并进行服务调用。B中获取的服务其实是一个服务代理类,而真正的服务实例在A处。何为服务代理?服务代理可以帮助我们向服务提供端发送调用请求,并且响应调用结果。那么如何发送调用请求呢?相信大家能想到,是调用端B将调用参数和方法名称等必要信息包装成集群消息(ClusteredMessage),然后通过send方法将请求通过Clustered Event Bus发送至服务提供端A处(需要提供此服务的通信地址)。A在注册服务的时候会创建一个MessageConsumer监听此服务的地址来响应调用请求。当接收到调用请求的时候,A会在本地调用方法,并将结果回复至调用端。所以异步RPC本质上其实是一个基于代理模式Request/Response 消息模式。

用时序图来描述一下上述过程:

Sequence Diagram of Async RPC

引入

以之前的SomeService接口为例,我们可以在集群中的一个节点上注册服务实例:

1
2
SomeService service = SomeService.createService(vertx, config);
ProxyHelper.registerService(SomeService.class, vertx, service, SomeService.SERVICE_ADDRESS);

然后在另一个节点上获取此服务实例的代理,并进行服务调用。调用的时候看起来就像在本地调用(LPC)一样,其实是进行了RPC通信:

1
2
3
4
5
6
SomeService proxyService = SomeService.createProxy(vertx);
// invoke the service
proxyService.process("fuck", ar -> {
// process the result...
});

其实,这里获取到的proxyService实例的真正类型是Vert.x自动生成的服务代理类SomeServiceVertxEBProxy类,里面封装了通过Event Bus进行通信的逻辑。我们首先来讲一下Service Proxy生成代理类的命名规范。

代理类命名规范

Vert.x Service Proxy在生成代理类时遵循一定的规范。假设有一Event Bus服务接口SomeService,Vert.x会自动为其生成代理类以及代理处理器:

  • 代理类的命名规范为 接口名 + VertxEBProxy。比如SomeService接口对应的代理类名称为SomeServiceVertxEBProxy
  • 代理类会继承原始的服务接口并实现所有方法的代理逻辑
  • 代理处理器的命名规范为 接口名 + VertxProxyHandler。比如SomeService接口对应的代理处理器名称为SomeServiceVertxProxyHandler
  • 代理处理器会继承ProxyHandler抽象类

ProxyHelper辅助类中注册服务以及创建代理都是遵循了这个规范。

在Event Bus上注册服务

我们通过ProxyHelper辅助类中的registerService方法来向Event Bus上注册Event Bus服务,来看其具体实现:

1
2
3
4
5
6
7
8
9
10
public static <T> MessageConsumer<JsonObject> registerService(Class<T> clazz, Vertx vertx, T service, String address,
boolean topLevel,
long timeoutSeconds) {
String handlerClassName = clazz.getName() + "VertxProxyHandler";
Class<?> handlerClass = loadClass(handlerClassName, clazz);
Constructor constructor = getConstructor(handlerClass, Vertx.class, clazz, boolean.class, long.class);
Object instance = createInstance(constructor, vertx, service, topLevel, timeoutSeconds);
ProxyHandler handler = (ProxyHandler) instance;
return handler.registerHandler(address);
}

首先根据约定生成对应的代理Handler的名称,然后通过类加载器加载对应的Handler类,再通过反射来创建代理Handler的实例,最后调用handlerregisterHandler方法注册服务地址。

registerHandler方法的实现在Vert.x生成的各个代理处理器中。以之前的SomeService为例,我们来看一下其对应的代理处理器SomeServiceVertxProxyHandler实现。首先是注册并订阅地址的registerHandler方法:

1
2
3
4
5
public MessageConsumer<JsonObject> registerHandler(String address) {
MessageConsumer<JsonObject> consumer = vertx.eventBus().<JsonObject>consumer(address).handler(this);
this.setConsumer(consumer);
return consumer;
}

registerHandler方法的实现非常简单,就是通过consumer方法在address地址上绑定了SomeServiceVertxProxyHandler自身。那么SomeServiceVertxProxyHandler是如何处理来自服务调用端的服务调用请求,并将调用结果返回到请求端呢?在回答这个问题之前,我们先来看看代理端(调用端)是如何发送服务调用请求的,这就要看对应的服务代理类的实现了。

服务调用

我们来看一下服务调用端是如何发出服务调用请求的消息的。之前已经介绍过,服务调用端是通过Event Bus的send方法发送调用请求的,并且会提供一个replyHandler来等待方法调用的结果。调用的方法名称会存放在消息中名为action的header中。以之前SomeService的代理类SomeServiceVertxEBProxyprocess方法的请求为例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public SomeService process(String id, Handler<AsyncResult<JsonObject>> resultHandler) {
if (closed) {
resultHandler.handle(Future.failedFuture(new IllegalStateException("Proxy is closed")));
return this;
}
JsonObject _json = new JsonObject();
_json.put("id", id);
DeliveryOptions _deliveryOptions = (_options != null) ? new DeliveryOptions(_options) : new DeliveryOptions();
_deliveryOptions.addHeader("action", "process");
_vertx.eventBus().<JsonObject>send(_address, _json, _deliveryOptions, res -> {
if (res.failed()) {
resultHandler.handle(Future.failedFuture(res.cause()));
} else {
resultHandler.handle(Future.succeededFuture(res.result().body()));
}
});
return this;
}

可以看到代理类把此方法传入的参数都放到一个JsonObject中了,并将要调用的方法名称存放在消息中名为action的header中。代理方法通过send方法将包装好的消息发送至之前注册的服务地址处,并且绑定replyHandler等待调用结果,然后使用我们传入到process方法中的resultHandler对结果进行处理。是不是很简单呢?

服务提供端的调用逻辑

调用请求发出之后,我们的服务提供端就会收到调用请求消息,然后执行SomeServiceVertxProxyHandler中的处理逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public void handle(Message<JsonObject> msg) {
try {
JsonObject json = msg.body();
String action = msg.headers().get("action");
if (action == null) {
throw new IllegalStateException("action not specified");
}
accessed();
switch (action) {
case "process": {
service.process((java.lang.String)json.getValue("id"), createHandler(msg));
break;
}
default: {
throw new IllegalStateException("Invalid action: " + action);
}
}
} catch (Throwable t) {
msg.reply(new ServiceException(500, t.getMessage()));
throw t;
}
}

handle方法首先从消息header中获取方法名称,如果获取不到则调用失败;接着handle方法会调用accessed方法记录最后调用服务的时间戳,这是为了实现超时的逻辑,后面我们会讲。接着handle方法会根据方法名称分派对应的逻辑,在“真正”的服务实例上调用方法。注意异步RPC的过程本质是 Request/Response 模式,因此这里的异步结果处理函数resultHandler应该将调用结果发送回调用端。此resultHandler是通过createHandler方法生成的,逻辑很清晰:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private <T> Handler<AsyncResult<T>> createHandler(Message msg) {
return res -> {
if (res.failed()) {
if (res.cause() instanceof ServiceException) {
msg.reply(res.cause());
} else {
msg.reply(new ServiceException(-1, res.cause().getMessage()));
}
} else {
if (res.result() != null && res.result().getClass().isEnum()) {
msg.reply(((Enum) res.result()).name());
} else {
msg.reply(res.result());
}
}
};
}

这样,一旦在服务提供端的调用过程完成时,调用结果就会被发送回调用端。这样调用端就可以调用结果执行真正的处理逻辑了。

超时处理

Vert.x自动生成的代理处理器内都封装了一个简单的超时处理逻辑,它是通过定时器定时检查最后的调用时间实现的。逻辑比较简单,直接放上相关逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public SomeServiceVertxProxyHandler(Vertx vertx, SomeService service, boolean topLevel, long timeoutSeconds) {
// 前面代码略。。。
if (timeoutSeconds != -1 && !topLevel) {
long period = timeoutSeconds * 1000 / 2;
if (period > 10000) {
period = 10000;
}
this.timerID = vertx.setPeriodic(period, this::checkTimedOut);
} else {
this.timerID = -1;
}
accessed();
}
private void checkTimedOut(long id) {
long now = System.nanoTime();
if (now - lastAccessed > timeoutSeconds * 1000000000) {
close();
}
}

一旦超时,就自动调用close方法终止定时器,注销响应服务调用请求的consumer并关闭代理。

代码是如何生成的?

大家可能会很好奇,这些服务代理类是怎么生成出来的?其实,这都是Vert.x Codegen的功劳。Vert.x Codegen的本质是一个 注解处理器(APT),它可以扫描源码中是否包含要处理的注解,检查规范后根据响应的模板生成对应的代码,这就是注解处理器的作用(注解处理器于JDK 1.6引入)。为了让Codegen正确地生成代码,我们需要配置编译参数来确保注解处理器能够正常的工作,具体的可以参考 Vert.x Codegen的文档 (之前里面缺了Gradle相关的实例,我给补上了)。

Vert.x Codegen使用MVEL2作为生成代码的模板,扩展名为*.templ,比如代理类和代理处理器的模板就位于 vert-x3/vertx-service-proxy 中,配置文件类似于这样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
{
"name": "Proxy",
"generators": [
{
"kind": "proxy",
"fileName": "ifaceFQCN + 'VertxEBProxy.java'",
"templateFileName": "serviceproxy/template/proxygen.templ"
},{
"kind": "proxy",
"fileName": "ifaceFQCN + 'VertxProxyHandler.java'",
"templateFileName": "serviceproxy/template/handlergen.templ"
}
]
}

具体的代码生成逻辑还要涉及APT及MVEL2的知识,这里就不展开讲了,有兴趣的朋友可以研究研究Vert.x Codegen的源码。

优点与缺点

Vert.x提供的这种Async RPC有着许多优点:

  • 通过Clustered Event Bus传输消息,不需引入其它额外的组件
  • 自动生成代理类及代理处理器,可以帮助我们做消息封装、传输、编码解码以及超时处理等问题,省掉不少冗余代码,让我们可以以LPC的方式进行RPC通信
  • 多语言支持(Polyglot support)。这是Vert.x的一大亮点。只要加上@VertxGen注解并在编译期依赖中加上对应语言的依赖(如vertx-lang-ruby),Vert.x Codegen就会自动处理注解并生成对应语言的服务代理(通过调用Java版本的服务代理实现)。这样Async RPC可以真正地做到不限language

当然Vert.x要求我们的服务接口必须是 基于回调的,这样写起来可能会不优雅。还好@VertxGen注解支持生成Rx版本的服务类,因此只要加上vertx-rx-java依赖,Codegen就能生成对应的Rx风格的服务类(异步方法返回Observable),这样我们就能以更reactive的风格来构建应用了,岂不美哉?

当然,为了考虑多语言支持的兼容性,Vert.x在传递消息的时候依然使用了传统的JSON,这样传输效率可能不如Protobuf高,但是不一定成为瓶颈。(看业务情况。真正的瓶颈一般还是在DB上)

Distributed System | RPC模块设计与实现

RPC是分布式系统中不可缺少的一部分。之前接触过几种RPC模块,这里就总结一下常见RPC模块的设计思想和实现。最后我们来设计一个可以方便进行RPC调用的RPC模块。

RPC模块设计需要考虑的问题

RPC模块将网络通信的过程封装成了方法调用的过程。从使用者的角度来看,在调用端进行RPC调用,就像进行本地函数调用一样;而在背后,RPC模块会将先调用端的函数名称、参数等调用信息序列化,其中序列化的方式有很多种,比如Java原生序列化、JSON、Protobuf等。接着RPC模块会将序列化后的消息通过某种协议(如TCP, AMQP等)发送到被调用端,被调用端在收到消息以后会对其解码,还原成调用信息,然后在本地进行方法调用,然后把调用结果发送回调用端,这样一次RPC调用过程就完成了。在这个过程中,我们要考虑到一些问题:

  • 设计成什么样的调用模型?
  • 调用信息通过什么样的方式序列化?通过哪种协议传输?性能如何?可靠性如何?
  • 分布式系统中最关注的问题:出现failure如何应对?如何容错?

我们一点一点来思考。第一点是设计成什么样的调用模型。常见的几种模型:

  • 服务代理。即实现一个服务接口,被调用端实现此服务接口,实现对应的方法逻辑,并写好RPC调用信息接收部分;调用端通过RPC模块获取一个服务代理实例,这个服务代理实例继承了服务接口并封装了相应的远程调用逻辑(包括消息的编码、解码、传输等)。调用端通过这个服务代理实例进行RPC调用。像Vert.x Service Proxy和grpc都是这种模型。这样的RPC模块需要具备生成服务代理类的功能
  • 直接调用,即设计特定的API用于RPC调用。比如Go的rpc包,里面的Client就提供了一个Call方法用于任意RPC调用,调用者需要传入方法名称、参数以及返回值指针(异步模式下传入callback handler)

我更倾向于选择服务代理这种模型,因为服务代理这种模型在进行RPC调用的时候就像直接LPC一样方便,但是需要RPC模块生成服务代理类,实现起来可能会麻烦些;当然Go的rpc包封装的也比较好,调用也比较方便,考虑到Go的类型系统,这已经不错了。。。

RPC调用耗时会包含通信耗时和本地调用耗时。当网络状况不好的时候,RPC调用可能会很长时间才能得到结果。对传统的同步RPC模式来说,这期间会阻塞调用者的调用线程。当需要进行大量RPC调用的时候,这种阻塞就伤不起了。这时候,异步RPC模式就派上用场了。我们可以对传统RPC模式稍加改造,把服务接口设计成异步模式的,即每个方法都要绑定一个回调函数,或利用Future-Promise模型返回一个Future。设计成异步模式以后,整个架构的灵活性就能得到很大的提升。

第二点是调用信息的序列化反序列化以及传输。序列化主要分为文本(如JSON, XML等)和二进制(如Thrift, Protocol等)两种,不同的序列化策略性能不同,因此我们应该尽量选择性能高,同时便于开发的序列化策略。在大型项目中我们常用Protobuf,性能比较好,支持多语言,但是需要单独定义.proto文件;有的时候我们会选择JSON,尽管效率不是很高但是方便,比如Vert.x Service Proxy就选择了JSON格式(底层依赖Event Bus)。另一点就是传输协议的选择。通常情况下我们会选择TCP协议(各种基于TCP的应用层协议,如HTTP/2)进行通信,当然用基于AMQP协议的消息队列也可以,两者都比较可靠。

这里还需提一点:如何高效地并发处理request/response,这依赖于通信模块的实现。拿Java来说,基于Netty NIO或者Java AIO的I/O多路复用都可以很好地并发处理请求;而像Go RPC则是来一个request就创建一个Goroutine并在其中处理请求(Goroutine作为轻量级用户态线程,创建性能消耗小)。

最后一点也是最重要的一点:实现容错,这也是分布式系统设计要考虑的一个核心。想象一下一次RPC调用过程中可能产生的各种failure:

  • 网络拥塞
  • 丢包,通信异常
  • 服务提供端挂了,调用端得不到response

一种简单的应对方式是不断地超时重传,即 at least once 模式。调用端设置一个超时定时器,若一定时间内没有收到response就继续发送调用请求,直到收到response或请求次数达到阈值。这种模式会发送重复请求,因此只适用于幂等性的操作,即执行多次结果相同的操作,比如读取操作。当然服务提供端也可以实现对应的逻辑来检查重复的请求。

更符合我们期望的容错方案是 at most once 模式。at most once 模式要求服务提供端检查重复请求,如果检查到当前请求是重复请求则返回之前的调用结果。服务提供端需要缓存之前的调用结果。这里面有几点需要考虑:

  • 如何实现重传和重复请求检测?是依靠协议(如TCP的超时重传)还是自己实现?

如果自己实现的话:

  • 如何检查重复请求?我们可以给每个请求生成一个独一无二的标识符(xid),并且在重传请求的时候使用相同的xid进行重传。用伪代码可以表示为:
1
2
3
4
5
6
7
if (seen(xid)) {
result = oldResult;
} else {
result = call(...);
oldResult = result;
setCurrentId(xid);
}
  • 如何保证xid是独一无二的?可以考虑使用UUID或者不同seed下的随机数。
  • 服务请求端需要在一个合适的时间丢弃掉保存的之前缓存的调用结果。
  • 当某个RPC调用过程还正在执行时,如何应对另外的重复请求?这种情况可以设置一个flag用于标识是否正在执行。
  • 如果服务调用端挂了并且重启怎么办?如果服务调用端将xid和调用结果缓存在内存中,那么保存的信息就丢失了。因此我们可以考虑将缓存信息定时写入硬盘,或者写入replication server中,当然这些情况就比较复杂了,涉及到高可用和一致性的问题。

由此可见,虽然RPC模块看似比较简单,但是设计的时候要考虑的问题还是非常多的。尤其是在保证性能的基础上又要保证可靠性,还要保证开发者的易用性,这就需要细致地思考了。

常见RPC模块实现

这里我来简单总结一下用过的常见的几个RPC模块的使用及实现思路。

Go RPC

Go的rpc包使用了Go自己的gob协议作为序列化协议(通过encoding/gob模块内的Encoder/Decoder进行编码和解码),而传输协议可以直接使用TCP(Dial方法)或者使用HTTP(DialHTTP)方法。开发者需要在服务端定义struct并且实现各种方法,然后将struct注册到服务端。需要进行RPC调用的时候,我们就可以在调用端通过Call方法(同步)或者Go方法(异步)进行调用。同步模式下调用结果即为reply指针所指的对象,而异步模式则会在调用结果准备就绪后通知绑定的channel并执行处理。

在rpc包的实现中(net/rpc/server.go),每个注册的服务类都被封装成了一个service结构体,而其中的每个方法则被封装成了一个methodType结构体:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
type methodType struct {
sync.Mutex // protects counters
method reflect.Method
ArgType reflect.Type
ReplyType reflect.Type
numCalls uint
}
type service struct {
name string // name of service
rcvr reflect.Value // receiver of methods for the service
typ reflect.Type // type of the receiver
method map[string]*methodType // registered methods
}

每个服务端都被封装成了一个Server结构体,其中的serviceMap存储着各个服务类的元数据:

1
2
3
4
5
6
7
8
type Server struct {
mu sync.RWMutex // protects the serviceMap
serviceMap map[string]*service
reqLock sync.Mutex // protects freeReq
freeReq *Request
respLock sync.Mutex // protects freeResp
freeResp *Response
}

RPC Server处理调用请求的默认路径是/_goRPC_。当请求到达时,Go就会调用Server结构体实现的ServeHTTP方法,经ServeConn方法传入gob codec预处理以后最终在ServeCodec方法内处理请求并进行调用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func (server *Server) ServeCodec(codec ServerCodec) {
sending := new(sync.Mutex)
for {
service, mtype, req, argv, replyv, keepReading, err := server.readRequest(codec)
if err != nil {
if debugLog && err != io.EOF {
log.Println("rpc:", err)
}
if !keepReading {
break
}
// send a response if we actually managed to read a header.
if req != nil {
server.sendResponse(sending, req, invalidRequest, codec, err.Error())
server.freeRequest(req)
}
continue
}
go service.call(server, sending, mtype, req, argv, replyv, codec)
}
codec.Close()
}

如果成功读取请求数据,那么接下来RPC Server就会新建一个Goroutine用来在本地执行方法,并向调用端返回response:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func (s *service) call(server *Server, sending *sync.Mutex, mtype *methodType, req *Request, argv, replyv reflect.Value, codec ServerCodec) {
mtype.Lock()
mtype.numCalls++
mtype.Unlock()
function := mtype.method.Func
// Invoke the method, providing a new value for the reply.
returnValues := function.Call([]reflect.Value{s.rcvr, argv, replyv})
// The return value for the method is an error.
errInter := returnValues[0].Interface()
errmsg := ""
if errInter != nil {
errmsg = errInter.(error).Error()
}
server.sendResponse(sending, req, replyv.Interface(), codec, errmsg)
server.freeRequest(req)
}

在执行调用的过程中应该注意并发问题,防止资源争用,修改数据时需要对数据加锁;至于方法的执行就是利用了Go的反射机制。调用完以后,RPC Server接着调用sendResponse方法发送response,其中写入response的时候同样需要加锁,防止资源争用。

grpc

grpc是Google开源的一个通用的RPC框架,支持C, Java和Go等语言。既然是Google出品,序列化协议必然用protobuf啦(毕竟高效),传输协议使用HTTP/2,非常不错。开发时需要在.proto文件里定义数据类型以及服务接口,然后配上protoc的grpc插件就能够自动生成各个语言的服务接口和代理类。粗略地看了下grpc-java的源码,底层利用Netty和OkHttp实现HTTP通信,性能应该不错。

Vert.x Service Proxy

Vert.x Service Proxy是Vert.x的一个异步RPC组件,支持通过各种JVM语言(Java, Scala, JS, JRuby, Groovy等)进行RPC调用。使用Vert.x Service Proxy时我们只需要按照异步开发模式编写服务接口,加上相应的注解,Vert.x Service Proxy就会自动生成相应的服务代理类和服务调用处理类。Vert.x Service Proxy底层借助Event Bus进行通信,调用时将调用消息包装成JSON数据然后通过Event Bus传输到服务端,得到结果后再返回给调用端。Vert.x的一大特性就是异步、响应式编程,因此Vert.x Service Proxy的RPC模型为异步RPC,用起来非常方便。几个异步过程可以通过各种组合子串成一串,妥妥的reactive programming的风格~

更多的关于Vert.x Service Proxy的实现原理的内容可以看这一篇:Vert.x 技术内幕 | 异步RPC实现原理

PS: 我经常吐槽Vert.x Service Proxy这个名字,因为光看名字很多人不知道它可以用来实现RPC,导致了很多人以为Vert.x不能做RPC。。。应该改名叫Vert.x Async RPC比较合适。。。当然它还有很大的改进空间,主要是被Vert.x Event Bus的性能和可靠性给拖累了。。。

Java RMI

Java RMI(Remote Method Invocation)是Java里的一种RPC编程接口,类似于服务代理的一种模式。用起来不是很方便。。。

Vert.x 技术内幕 | Event Bus 源码分析 (集群模式)

上篇文章中我们探索了Local模式下Event Bus的源码,在这篇文章中我们来探索一下Vert.x中的Clustered Event Bus是如何实现的。对应的Vert.x版本为3.3.2

集群模式介绍

我们先来简单地介绍一下集群模式下Event Bus的基本原理。

我们可以通过集群模式下的Event Bus在不同的服务器之间进行通信,其本质为TCP通信。Vert.x集群模式需要一个集群管理器(默认为HazelcastClusterManager)来管理集群的状态,存储元数据。当我们在某个节点A给集群模式的Event Bus绑定一个对应地址addressconsumer的时候,Event Bus会将此节点的ServerID(包含hostport信息)存储至集群管理器的共享Map中,key为绑定的地址address,value为绑定了此地址address的所有结点的ServerID集合(可以看作是具有负载均衡功能的Set)。集群中的所有节点都可以从集群管理器中获取Map记录。并且绑定consumer的同时节点A会建立一个NetServer接收数据。这样,我们再通过另一个结点B向此地址address发送消息的时候,B就会从集群管理器中取出此地址对应的ServerID集合,并根据是点对点发送还是发布,根据相应的策略创建NetClient执行消息分发逻辑。这样,对应的NetServer收到数据后会对其进行解码然后在本地进行消息的处理。

集群模式下我们还需要注意几个问题:

  • 某个节点挂了怎么办?
  • 如何确保结点的高可用性?

当某个节点挂掉的时候,其连接将会不可用,集群管理器就会将此节点的信息从集群中移除,并且传播到所有的节点删除对应缓存的信息,这样发消息的时候就不会发送到挂掉的无效节点处。至于高可用性,Vert.x提供了高可用管理器HAManager用于实现高可用性,在发生故障时能够快速failover,详情可见官方文档

好了,下面我们就来分析一下Clustered Event Bus的源码。集群模式下Event Bus的类型为ClusteredEventBus,它继承了单机模式的EventBusImpl类。其初始化过程与Local模式大同小异,因此这里就直接分析发送和接受消息相关的逻辑了。

绑定MessageConsumer

我们还是先来看consumer方法的逻辑。前面的调用逻辑都和Local模式下相同,可以参考之前的文章。不同之处在添加记录的地方。Cluster模式下Event Bus需要将当前机器的位置存储至Map中并且传播至集群内的所有节点,因此ClusteredEventBus重写了四个参数版本的addRegistration方法(之前在EventBusImpl类中这个版本的方法用处不大,这里用处就大了):

1
2
3
4
5
6
7
8
9
10
11
@Override
protected <T> void addRegistration(boolean newAddress, String address,
boolean replyHandler, boolean localOnly,
Handler<AsyncResult<Void>> completionHandler) {
if (newAddress && subs != null && !replyHandler && !localOnly) {
// Propagate the information
subs.add(address, serverID, completionHandler);
} else {
completionHandler.handle(Future.succeededFuture());
}
}

如果要绑定MessageConsumer对应的地址在本地中没有注册过,并且不是Event Bus自动生成的reply consumer,并且允许在集群范围内传播的话,Event Bus就会将当前机器的位置添加到集群内的记录subs中。subs的类型为AsyncMultiMap:

1
private AsyncMultiMap<String, ServerID> subs;

ClusteredEventBus启动时会对其进行初始化:

1
2
3
4
5
6
7
8
clusterManager.<String, ServerID>getAsyncMultiMap(SUBS_MAP_NAME, ar2 -> {
if (ar2.succeeded()) {
subs = ar2.result();
// 其他代码暂略。。。
} else {
// 代码略。。。
}
});

从名字就可以看出来,AsyncMultiMap允许一键多值,并且其变动可以在集群范围内传播。由于AsyncMultiMap集群范围内的,因此对其操作都是异步的。在这里我们可以简单地把它看作是一个Map<String, ChoosableIterable<ServerID>>类型的键值对,其中ChoosableIterable与之前见到过的Handlers类似,属于可以通过轮询算法获取某一元素的集合。subs的key为绑定的地址,value为绑定此地址的机器位置的集合。机器的位置用ServerID表示,里面包含了该机器的hostport。这样,每当我们向某个地址绑定一个MessageConsumer的时候,绑定consumer的ServerID就会被记录到集群中并与地址相对应,其它机器在向此地址发送(或发布)消息的时候,Event Bus就可以从集群中获取在此地址上绑定了consumer的所有ServerID,再根据相应的策略选出合适的ServerID建立TCP通信将数据发送至对应机器中,对应机器收到消息后解码并在本地对其进行处理。

这里面还需要注意一点:我们可以在EventBusOptions中指定ServerIDporthost,若不指定则port将随机分配(NetServer的特性)。

剩下的过程也就大同小异了。至于unregister方法,无非就是将底层的removeRegistration方法重写,从subs中删除对应的ServerID并传播至其它节点:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@Override
protected <T> void removeRegistration(HandlerHolder lastHolder, String address,
Handler<AsyncResult<Void>> completionHandler) {
if (lastHolder != null && subs != null && !lastHolder.isLocalOnly()) {
removeSub(address, serverID, completionHandler);
} else {
callCompletionHandlerAsync(completionHandler);
}
}
private void removeSub(String subName, ServerID theServerID, Handler<AsyncResult<Void>> completionHandler) {
subs.remove(subName, theServerID, ar -> {
if (!ar.succeeded()) {
log.error("Failed to remove sub", ar.cause());
} else {
if (ar.result()) {
if (completionHandler != null) {
completionHandler.handle(Future.succeededFuture());
}
} else {
if (completionHandler != null) {
completionHandler.handle(Future.failedFuture("sub not found"));
}
}
}
});
}

消息的发送/发布

集群模式下的消息与本地模式下的消息不同。集群模式下的消息实体类型为ClusteredMessage,它继承了MessageImpl消息实体类,并且根据远程传输的特性实现了一种Wire Protocol用于远程传输消息,并负责消息的编码和解码。具体的实现就不展开说了,如果有兴趣的话可以阅读ClusteredMessage类中相关方法的实现。

我们上篇文章提到过,Event Bus底层通过createMessage方法创建消息。因此ClusteredEventBus里就对此方法进行了重写,当然改动就是把MessageImpl替换成了ClusteredMessage:

1
2
3
4
5
6
7
8
@Override
protected MessageImpl createMessage(boolean send, String address, MultiMap headers, Object body, String codecName) {
Objects.requireNonNull(address, "no null address accepted");
MessageCodec codec = codecManager.lookupCodec(body, codecName);
@SuppressWarnings("unchecked")
ClusteredMessage msg = new ClusteredMessage(serverID, address, null, headers, body, codec, send, this);
return msg;
}

接下来就是消息的发送逻辑了。ClusteredEventBus重写了sendOrPub方法,此方法存在于SendContextImpl类中的next方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
@Override
public void next() {
if (iter.hasNext()) {
Handler<SendContext> handler = iter.next();
try {
handler.handle(this);
} catch (Throwable t) {
log.error("Failure in interceptor", t);
}
} else {
sendOrPub(this);
}
}

我们来看一下ClusteredEventBus是如何进行集群内消息的分发的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Override
protected <T> void sendOrPub(SendContextImpl<T> sendContext) {
String address = sendContext.message.address();
Handler<AsyncResult<ChoosableIterable<ServerID>>> resultHandler = asyncResult -> {
if (asyncResult.succeeded()) {
ChoosableIterable<ServerID> serverIDs = asyncResult.result();
if (serverIDs != null && !serverIDs.isEmpty()) {
sendToSubs(serverIDs, sendContext);
} else {
metrics.messageSent(address, !sendContext.message.send(), true, false);
deliverMessageLocally(sendContext);
}
} else {
log.error("Failed to send message", asyncResult.cause());
}
};
if (Vertx.currentContext() == null) {
// Guarantees the order when there is no current context
sendNoContext.runOnContext(v -> {
subs.get(address, resultHandler);
});
} else {
subs.get(address, resultHandler);
}
}

首先Event Bus需要从传入的sendContext中获取要发送至的地址。接着Event Bus需要从集群管理器中获取在此地址上绑定consumer的所有ServerID,这个过程是异步的,并且需要在Vert.x Context中执行。如果获取记录成功,我们会得到一个可通过轮询算法获取ServerID的集合(类型为ChoosableIterable<ServerID>)。如果集合为空,则代表集群内其它节点没有在此地址绑定consumer(或者由于一致性问题没有同步),Event Bus就将消息通过deliverMessageLocally方法在本地进行相应的分发。deliverMessageLocally方法的逻辑之前我们已经详细讲过了,这里就不再细说了;如果集合不为空,Event Bus就调用sendToSubs方法进行下一步操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
private <T> void sendToSubs(ChoosableIterable<ServerID> subs, SendContextImpl<T> sendContext) {
String address = sendContext.message.address();
if (sendContext.message.send()) {
// Choose one
ServerID sid = subs.choose();
if (!sid.equals(serverID)) { //We don't send to this node
metrics.messageSent(address, false, false, true);
sendRemote(sid, sendContext.message);
} else {
metrics.messageSent(address, false, true, false);
deliverMessageLocally(sendContext);
}
} else {
// Publish
boolean local = false;
boolean remote = false;
for (ServerID sid : subs) {
if (!sid.equals(serverID)) { //We don't send to this node
remote = true;
sendRemote(sid, sendContext.message);
} else {
local = true;
}
}
metrics.messageSent(address, true, local, remote);
if (local) {
deliverMessageLocally(sendContext);
}
}
}

这里就到了分sendpublish的时候了。如果发送消息的模式为点对点模式(send),Event Bus会从给的的集合中通过轮询算法获取一个ServerID。然后Event Bus会检查获取到的ServerID是否与本机ServerID相同,如果相同则代表在一个机子上,直接记录metrics信息并且调用deliverMessageLocally方法往本地发送消息即可;如果不相同,Event Bus就会调用sendRemote方法执行真正的远程消息发送逻辑。发布订阅模式的逻辑与其大同小异,只不过需要遍历一下ChoosableIterable<ServerID>集合,然后依次执行之前讲过的逻辑。注意如果要在本地发布消息只需要发一次。

真正的远程消息发送逻辑在sendRemote方法中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private void sendRemote(ServerID theServerID, MessageImpl message) {
ConnectionHolder holder = connections.get(theServerID);
if (holder == null) {
holder = new ConnectionHolder(this, theServerID, options);
ConnectionHolder prevHolder = connections.putIfAbsent(theServerID, holder);
if (prevHolder != null) {
// Another one sneaked in
holder = prevHolder;
} else {
holder.connect();
}
}
holder.writeMessage((ClusteredMessage)message);
}

一开始我们就提到过,节点之间通过Event Bus进行通信的本质是TCP,因此这里需要创建一个NetClient作为TCP服务端,连接到之前获取的ServerID对应的节点然后将消息通过TCP协议发送至接收端。这里Vert.x用一个封装类ConnectionHolderNetClient进行了一些封装。

ClusteredEventBus中维持着一个connections哈希表对用于保存ServerID对应的连接ConnectionHolder。在sendRemote方法中,Event Bus首先会从connections中获取ServerID对应的连接。如果获取不到就创建连接并将其添加至connections记录中并调用对应ConnectionHolderconnect方法建立连接;最后调用writeMessage方法将消息编码后通过TCP发送至对应的接收端。

那么ConnectionHolder是如何实现的呢?我们来看一下其构造函数:

1
2
3
4
5
6
7
8
9
10
ConnectionHolder(ClusteredEventBus eventBus, ServerID serverID, EventBusOptions options) {
this.eventBus = eventBus;
this.serverID = serverID;
this.vertx = eventBus.vertx();
this.metrics = eventBus.getMetrics();
NetClientOptions clientOptions = new NetClientOptions(options.toJson());
ClusteredEventBus.setCertOptions(clientOptions, options.getKeyCertOptions());
ClusteredEventBus.setTrustOptions(clientOptions, options.getTrustOptions());
client = new NetClientImpl(eventBus.vertx(), clientOptions, false);
}

可以看到ConnectionHolder初始化的时候会创建一个NetClient作为TCP请求端,而请求的对象就是接收端的NetServer(后边会讲),客户端配置已经在EventBusOptions中事先配置好了。我们来看看connect方法是如何建立连接的:

1
2
3
4
5
6
7
8
9
10
11
12
synchronized void connect() {
if (connected) {
throw new IllegalStateException("Already connected");
}
client.connect(serverID.port, serverID.host, res -> {
if (res.succeeded()) {
connected(res.result());
} else {
close();
}
});
}

可以看到这里很简单地调用了NetClient#connect方法建立TCP连接,如果建立连接成功的话会得到一个NetSocket对象。Event Bus接着将其传至connected方法中进行处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private synchronized void connected(NetSocket socket) {
this.socket = socket;
connected = true;
socket.exceptionHandler(t -> close());
socket.closeHandler(v -> close());
socket.handler(data -> {
// Got a pong back
vertx.cancelTimer(timeoutID);
schedulePing();
});
// Start a pinger
schedulePing();
for (ClusteredMessage message : pending) {
Buffer data = message.encodeToWire();
metrics.messageWritten(message.address(), data.length());
socket.write(data);
}
pending.clear();
}

首先Event Bus通过exceptionHandlercloseHandler方法给连接对应的NetSocket绑定异常回调和连接关闭回调,触发的时候都调用close方法关闭连接;为了保证不丢失连接,消息发送方每隔一段时间就需要对消息接收方发送一次心跳包(PING),如果消息接收方在一定时间内没有回复,那么就认为连接丢失,调用close方法关闭连接。心跳检测的逻辑在schedulePing方法中,比较清晰,这里就不详细说了。大家会发现ConnectionHolder里也有个消息队列(缓冲区)pending,并且这里会将队列中的消息依次通过TCP发送至接收端。为什么需要这样呢?其实,这要从创建TCP客户端说起。创建TCP客户端这个过程应该是异步的,需要消耗一定时间,而ConnectionHolder中封装的connect方法却是同步式的。前面我们刚刚看过,通过connect方法建立连接以后会接着调用writeMessage方法发送消息,而这时候客户端连接可能还没建立,因此需要这么个缓冲区先存着,等着连接建立了再一块发送出去(存疑:为什么不将connect方法直接设计成异步的?)。

至于发送消息的writeMessage方法,其逻辑一目了然:

1
2
3
4
5
6
7
8
9
10
11
12
synchronized void writeMessage(ClusteredMessage message) {
if (connected) {
Buffer data = message.encodeToWire();
metrics.messageWritten(message.address(), data.length());
socket.write(data);
} else {
if (pending == null) {
pending = new ArrayDeque<>();
}
pending.add(message);
}
}

如果连接已建立,Event Bus就会调用对应ClusteredMessageencodeToWire方法将其转化为字节流Buffer,然后记录metrics信息,最后通过socketwrite方法将消息写入到Socket中,这样消息就从发送端通过TCP发送到了接收端。如果连接未建立,就如之前讲的那样,先把消息存到消息队列中,等连接建立了再一块发出去。

这样,Clustered Event Bus下消息的发送逻辑就理清楚了。下面我们看一下接收端是如何接收消息并在本地进行消息的处理的。

消息的接收

一开始我们提到过,每个节点的Clustered Event Bus在启动时都会创建一个NetServer作为接收消息的TCP服务端。TCP Server的porthost可以在EventBusOptions中指定,如果不指定的话默认随机分配port,然后Event Bus会根据NetServer的配置来生成当前节点的ServerID

创建TCP Server的逻辑在start方法中,与接受消息有关的逻辑就是这一句:

1
server.connectHandler(getServerHandler());

我们知道,NetServerconnectHandler方法用于绑定对服务端Socket的处理函数,而这里绑定的处理函数是由getServerHandler方法生成的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
private Handler<NetSocket> getServerHandler() {
return socket -> {
RecordParser parser = RecordParser.newFixed(4, null);
Handler<Buffer> handler = new Handler<Buffer>() {
int size = -1;
public void handle(Buffer buff) {
if (size == -1) {
size = buff.getInt(0);
parser.fixedSizeMode(size);
} else {
ClusteredMessage received = new ClusteredMessage();
received.readFromWire(buff, codecManager);
metrics.messageRead(received.address(), buff.length());
parser.fixedSizeMode(4);
size = -1;
if (received.codec() == CodecManager.PING_MESSAGE_CODEC) {
// Just send back pong directly on connection
socket.write(PONG);
} else {
deliverMessageLocally(received);
}
}
}
};
parser.setOutput(handler);
socket.handler(parser);
};
}

逻辑非常清晰。这里Event Bus使用了RecordParser来获取发送过来的对应长度的Buffer,并将其绑定在NetServer的Socket上。真正的解析Buffer并处理的逻辑在内部的handler中。之前ClusteredMessage中的Wire Protocol规定Buffer的首部第一个int值为要发送Buffer的长度(逻辑见ClusteredMessage#encodeToWire方法),因此这里首先获取长度,然后给parser设定正确的fixed size,这样parser就可以截取正确长度的Buffer流了。下面Event Bus会创建一个空的ClusteredMessage,然后调用其readFromWire方法从Buffer中重建消息。当然这里还要记录消息已经读取的metrics信息。接着检测收到的消息实体类型是否为心跳检测包(PING),如果是的话就发送回ACK消息(PONG);如果不是心跳包,则代表是正常的消息,Event Bus就调用我们熟悉的deliverMessageLocally函数在本地进行分发处理,接下来的过程就和Local模式一样了。

Vert.x 技术内幕 | Event Bus 源码分析 (Local模式)

Event Bus是Vert.x的“神经系统”,是最为关键的几个部分之一。今天我们就来探索一下Event Bus的实现原理。本篇分析的是Local模式的Event Bus,对应的Vert.x版本为3.3.2

本文假定读者有一定的并发编程基础以及Vert.x使用基础,并且对Vert.x的线程模型以及back-pressure有所了解。

Local Event Bus的创建

一般情况下,我们通过VertxeventBus方法来创建或获取一个EventBus实例:

1
2
Vertx vertx = Vertx.vertx();
EventBus eventBus = vertx.eventBus();

eventBus方法定义于Vertx接口中,我们来看一下其位于VertxImpl类中的实现:

1
2
3
4
5
6
7
8
9
10
public EventBus eventBus() {
if (eventBus == null) {
// If reading from different thread possibility that it's been set but not visible - so provide
// memory barrier
synchronized (this) {
return eventBus;
}
}
return eventBus;
}

可以看到此方法返回VertxImpl实例中的eventBus成员,同时需要注意并发可见性问题。那么eventBus成员是何时初始化的呢?答案在VertxImpl类的构造函数中。这里截取相关逻辑:

1
2
3
4
5
6
if (options.isClustered()) {
// 集群模式相关逻辑
} else {
this.clusterManager = null;
createAndStartEventBus(options, resultHandler);
}

可以看到VertxImpl内部是通过createAndStartEventBus方法来初始化eventBus的。我们来看一下其逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private void createAndStartEventBus(VertxOptions options, Handler<AsyncResult<Vertx>> resultHandler) {
if (options.isClustered()) {
eventBus = new ClusteredEventBus(this, options, clusterManager, haManager);
} else {
eventBus = new EventBusImpl(this);
}
eventBus.start(ar2 -> {
if (ar2.succeeded()) {
// If the metric provider wants to use the event bus, it cannot use it in its constructor as the event bus
// may not be initialized yet. We invokes the eventBusInitialized so it can starts using the event bus.
metrics.eventBusInitialized(eventBus);
if (resultHandler != null) {
resultHandler.handle(Future.succeededFuture(this));
}
} else {
log.error("Failed to start event bus", ar2.cause());
}
});
}

可以看到此方法通过eventBus = new EventBusImpl(this)eventBus进行了初始化(Local模式为EventBusImpl),并且调用eventBusstart方法对其进行一些额外的初始化工作。我们来看一下EventBusImpl类的start方法:

1
2
3
4
5
6
7
public synchronized void start(Handler<AsyncResult<Void>> completionHandler) {
if (started) {
throw new IllegalStateException("Already started");
}
started = true;
completionHandler.handle(Future.succeededFuture());
}

首先初始化过程需要防止race condition,因此方法为synchronized的。该方法仅仅将EventBusImpl类中的一个started标志位设为true来代表Event Bus已启动。注意started标志位为volatile的,这样可以保证其可见性,确保其它线程通过checkStarted方法读到的started结果总是最新的。设置完started标志位后,Vert.x会接着调用传入的completionHandler处理函数,也就是上面我们在createAndStartEventBus方法中看到的 —— 调用metrics成员的eventBusInitialized方法以便Metrics类可以在Event Bus初始化完毕后使用它(不过默认情况下此方法的逻辑为空)。

可以看到初始化过程还是比较简单的,我们接下来先来看看订阅消息 —— consumer方法的逻辑。

consume

我们来看一下consumer方法的逻辑,其原型位于EventBus接口中:

1
2
<T> MessageConsumer<T> consumer(String address);
<T> MessageConsumer<T> consumer(String address, Handler<Message<T>> handler);

其实现位于EventBusImpl类中:

1
2
3
4
5
6
7
@Override
public <T> MessageConsumer<T> consumer(String address, Handler<Message<T>> handler) {
Objects.requireNonNull(handler, "handler");
MessageConsumer<T> consumer = consumer(address);
consumer.handler(handler);
return consumer;
}

首先要确保传入的handler不为空,然后Vert.x会调用只接受一个address参数的consumer方法获取对应的MessageConsumer,最后给获取到的MessageConsumer绑定上传入的handler。我们首先来看一下另一个consumer方法的实现:

1
2
3
4
5
6
@Override
public <T> MessageConsumer<T> consumer(String address) {
checkStarted();
Objects.requireNonNull(address, "address");
return new HandlerRegistration<>(vertx, metrics, this, address, null, false, null, -1);
}

首先Vert.x会检查Event Bus是否已经启动,并且确保传入的地址不为空。然后Vert.x会传入一大堆参数创建一个新的HandlerRegistration类型的实例,并返回。可以推测HandlerRegistrationMessageConsumer接口的具体实现,它一定非常重要。所以我们来看一看HandlerRegistration类是个啥玩意。首先看一下HandlerRegistration的类体系结构:

可以看到HandlerRegistration类同时继承了MessageConsumer<T>以及Handler<Message<T>>接口,从其类名可以看出它相当于一个”Handler注册记录”,是非常重要的一个类。它有一堆的成员变量,构造函数对vertx, metrics, eventBus, address(发送地址), repliedAddress(回复地址), localOnly(是否在集群内传播), asyncResultHandler等几个成员变量进行初始化,并且检查超时时间timeout,如果设定了超时时间那么设定并保存超时计时器(仅用于reply handler中),如果计时器时间到,代表回复超时。因为有一些函数还没介绍,超时的逻辑我们后边再讲。

Note: 由于MessageConsumer接口继承了ReadStream接口,因此它支持back-pressure,其实现就在HandlerRegistration类中。我们将稍后解析back-pressure的实现。

现在回到consumer方法中来。创建了MessageConsumer实例后,我们接着调用它的handler方法绑定上对应的消息处理函数。handler方法的实现位于HandlerRegistration类中:

1
2
3
4
5
6
7
8
9
10
11
12
@Override
public synchronized MessageConsumer<T> handler(Handler<Message<T>> handler) {
this.handler = handler;
if (this.handler != null && !registered) {
registered = true;
eventBus.addRegistration(address, this, repliedAddress != null, localOnly);
} else if (this.handler == null && registered) {
// This will set registered to false
this.unregister();
}
return this;
}

首先,handler方法将此HandlerRegistration中的handler成员设置为传入的消息处理函数。HandlerRegistration类中有一个registered标志位代表是否已绑定消息处理函数。handler方法会检查传入的handler是否为空且是否已绑定消息处理函数。如果不为空且未绑定,Vert.x就会将registered标志位设为true并且调用eventBusaddRegistration方法将此consumer注册至Event Bus上;如果handler为空且已绑定消息处理函数,我们就调用unregister方法注销当前的consumer。我们稍后会分析unregister方法的实现。

前面提到过注册consumer的逻辑位于Event Bus的addRegistration方法中,因此我们来分析一下它的实现:

1
2
3
4
5
6
protected <T> void addRegistration(String address, HandlerRegistration<T> registration,
boolean replyHandler, boolean localOnly) {
Objects.requireNonNull(registration.getHandler(), "handler");
boolean newAddress = addLocalRegistration(address, registration, replyHandler, localOnly);
addRegistration(newAddress, address, replyHandler, localOnly, registration::setResult);
}

addRegistration方法接受四个参数:发送地址address、传入的consumer registration、代表是否为reply handler的标志位replyHandler以及代表是否在集群范围内传播的标志位localOnly。首先确保传入的HandlerRegistration不为空。然后Vert.x会调用addLocalRegistration方法将此consumer注册至Event Bus上:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
protected <T> boolean addLocalRegistration(String address, HandlerRegistration<T> registration,
boolean replyHandler, boolean localOnly) {
Objects.requireNonNull(address, "address");
Context context = Vertx.currentContext();
boolean hasContext = context != null;
if (!hasContext) {
// Embedded
context = vertx.getOrCreateContext();
}
registration.setHandlerContext(context);
boolean newAddress = false;
HandlerHolder holder = new HandlerHolder<>(metrics, registration, replyHandler, localOnly, context);
Handlers handlers = handlerMap.get(address);
if (handlers == null) {
handlers = new Handlers();
Handlers prevHandlers = handlerMap.putIfAbsent(address, handlers);
if (prevHandlers != null) {
handlers = prevHandlers;
}
newAddress = true;
}
handlers.list.add(holder);
if (hasContext) {
HandlerEntry entry = new HandlerEntry<>(address, registration);
context.addCloseHook(entry);
}
return newAddress;
}

首先该方法要确保地址address不为空,接着它会获取当前线程下对应的Vert.x Context,如果获取不到则表明当前不在Verticle中(即Embedded),需要调用vertx.getOrCreateContext()来获取Context;然后将获取到的Context赋值给registration内部的handlerContext(代表消息处理对应的Vert.x Context)。

下面就要将给定的registration注册至Event Bus上了。这里Vert.x用一个HandlerHolder类来包装registrationcontext。接着Vert.x会从存储消息处理Handler的哈希表handlerMap中获取给定地址对应的Handlers,哈希表的类型为ConcurrentMap<String, Handlers>,key为地址,value为对应的HandlerHolder集合。注意这里的Handlers类代表一些Handler的集合,它内部维护着一个列表list用于存储每个HandlerHolderHandlers类中只有一个choose函数,此函数根据轮询算法从HandlerHolder集合中选定一个HandlerHolder,这即是Event Bus发送消息时实现load-balancing的方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class Handlers {
private final AtomicInteger pos = new AtomicInteger(0);
public final List<HandlerHolder> list = new CopyOnWriteArrayList<>();
public HandlerHolder choose() {
while (true) {
int size = list.size();
if (size == 0) {
return null;
}
int p = pos.getAndIncrement();
if (p >= size - 1) {
pos.set(0);
}
try {
return list.get(p);
} catch (IndexOutOfBoundsException e) {
// Can happen
pos.set(0);
}
}
}
}

获取到对应的handlers以后,Vert.x首先需要检查其是否为空,如果为空代表此地址还没有注册消息处理Handler,Vert.x就会创建一个Handlers并且将其置入handlerMap中,将newAddress标志位设为true代表这是一个新注册的地址,然后将其赋值给handlers。接着我们向handlers中的HandlerHolder列表list中添加刚刚创建的HandlerHolder实例,这样就将registration注册至Event Bus中了。

前面判断当前线程是否在Vert.x Context的标志位hasContext还有一个用途:如果当前线程在Vert.x Context下(比如在Verticle中),Vert.x会通过addCloseHook方法给当前的context添加一个钩子函数用于注销当前绑定的registration。当对应的Verticle被undeploy的时候,此Verticle绑定的所有消息处理Handler都会被unregister。Hook的类型为HandlerEntry<T>,它继承了Closeable接口,对应的逻辑在close函数中实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class HandlerEntry<T> implements Closeable {
final String address;
final HandlerRegistration<T> handler;
public HandlerEntry(String address, HandlerRegistration<T> handler) {
this.address = address;
this.handler = handler;
}
// ...
// Called by context on undeploy
public void close(Handler<AsyncResult<Void>> completionHandler) {
handler.unregister(completionHandler);
completionHandler.handle(Future.succeededFuture());
}
}

可以看到close函数会将绑定的registration从Event Bus的handlerMap中移除并执行completionHandler中的逻辑,completionHandler可由用户指定。

那么在哪里调用这些绑定的hook呢?答案是在DeploymentManager类中的doUndeploy方法中,通过contextrunCloseHooks方法执行绑定的hook函数。相关代码如下(只截取相关逻辑):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public synchronized void doUndeploy(ContextImpl undeployingContext, Handler<AsyncResult<Void>> completionHandler) {
// 前面代码略
context.runCloseHooks(ar2 -> {
if (ar2.failed()) {
// Log error but we report success anyway
log.error("Failed to run close hook", ar2.cause());
}
if (ar.succeeded() && undeployCount.incrementAndGet() == numToUndeploy) {
reportSuccess(null, undeployingContext, completionHandler);
} else if (ar.failed() && !failureReported.get()) {
failureReported.set(true);
reportFailure(ar.cause(), undeployingContext, completionHandler);
}
});
// 后面代码略
}

再回到addRegistration方法中。刚才addLocalRegistration方法的返回值newAddress代表对应的地址是否为新注册的。接着我们调用另一个版本的addRegistration方法,传入了一大堆参数:

1
2
3
4
5
protected <T> void addRegistration(boolean newAddress, String address,
boolean replyHandler, boolean localOnly,
Handler<AsyncResult<Void>> completionHandler) {
completionHandler.handle(Future.succeededFuture());
}

好吧,传入的前几个参数没用到。。。最后一个参数completionHandler传入的是registration::setResult方法引用,也就是说这个方法调用了对应registrationsetResult方法。其实现位于HandlerRegistration类中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public synchronized void setResult(AsyncResult<Void> result) {
this.result = result;
if (completionHandler != null) {
if (result.succeeded()) {
metric = metrics.handlerRegistered(address, repliedAddress);
}
Handler<AsyncResult<Void>> callback = completionHandler;
vertx.runOnContext(v -> callback.handle(result));
} else if (result.failed()) {
log.error("Failed to propagate registration for handler " + handler + " and address " + address);
} else {
metric = metrics.handlerRegistered(address, repliedAddress);
}
}

首先先设置registration内部的result成员(正常情况下为Future.succeededFuture())。接着Vert.x会判断registration是否绑定了completionHandler(与之前的completionHandler不同,这里的completionHandlerMessageConsumer注册成功时调用的Handler),若绑定则记录Metrics信息(handlerRegistered)并在Vert.x Context内调用completionHandler的逻辑;若未绑定completionHandler则仅记录Metrics信息。

到此为止,consumer方法的逻辑就分析完了。在分析sendpublish方法的逻辑之前,我们先来看一下如何注销绑定的MessageConsumer

unregister

我们通过调用MessageConsumerunregister方法实现注销操作。Vert.x提供了两个版本的unregister方法:

1
2
3
void unregister();
void unregister(Handler<AsyncResult<Void>> completionHandler);

其中第二个版本的unregister方法会在注销操作完成时调用传入的completionHandler。比如在cluster范围内注销consumer需要消耗一定的时间在集群内传播,因此第二个版本的方法就会派上用场。我们来看一下其实现,它们最后都是调用了HandlerRegistration类的doUnregister方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private void doUnregister(Handler<AsyncResult<Void>> completionHandler, boolean callEndHandler) {
if (timeoutID != -1) {
vertx.cancelTimer(timeoutID);
}
if (endHandler != null && callEndHandler) {
Handler<Void> theEndHandler = endHandler;
Handler<AsyncResult<Void>> handler = completionHandler;
completionHandler = ar -> {
theEndHandler.handle(null);
if (handler != null) {
handler.handle(ar);
}
};
}
if (registered) {
registered = false;
eventBus.removeRegistration(address, this, completionHandler);
} else {
callCompletionHandlerAsync(completionHandler);
}
registered = false;
}

如果设定了超时定时器(timeoutID合法),那么Vert.x会首先将定时器关闭。接着Vert.x会判断是否需要调用endHandler。那么endHandler又是什么呢?前面我们提到过MessageConsumer接口继承了ReadStream接口,而ReadStream接口定义了一个endHandler方法用于绑定一个endHandler,当stream中的数据读取完毕时会调用。而在Event Bus中,消息源源不断地从一处发送至另一处,因此只有在某个consumer
被unregister的时候,其对应的stream才可以叫“读取完毕”,因此Vert.x选择在doUnregister方法中调用endHandler

接着Vert.x会判断此consumer是否已注册消息处理函数Handler(通过检查registered标志位),若已注册则将对应的Handler从Event Bus中的handlerMap中移除并将registered设为false;若还未注册Handler且提供了注销结束时的回调completionHandler(注意不是HandlerRegistration类的成员变量completionHandler,而是之前第二个版本的unregister中传入的Handler,用同样的名字很容易混。。。),则通过callCompletionHandlerAsync方法调用回调函数。

从Event Bus中移除Handler的逻辑位于EventBusImpl类的removeRegistration方法中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
protected <T> void removeRegistration(String address, HandlerRegistration<T> handler, Handler<AsyncResult<Void>> completionHandler) {
HandlerHolder holder = removeLocalRegistration(address, handler);
removeRegistration(holder, address, completionHandler);
}
protected <T> void removeRegistration(HandlerHolder handlerHolder, String address,
Handler<AsyncResult<Void>> completionHandler) {
callCompletionHandlerAsync(completionHandler);
}
protected <T> HandlerHolder removeLocalRegistration(String address, HandlerRegistration<T> handler) {
Handlers handlers = handlerMap.get(address);
HandlerHolder lastHolder = null;
if (handlers != null) {
synchronized (handlers) {
int size = handlers.list.size();
// Requires a list traversal. This is tricky to optimise since we can't use a set since
// we need fast ordered traversal for the round robin
for (int i = 0; i < size; i++) {
HandlerHolder holder = handlers.list.get(i);
if (holder.getHandler() == handler) {
handlers.list.remove(i);
holder.setRemoved();
if (handlers.list.isEmpty()) {
handlerMap.remove(address);
lastHolder = holder;
}
holder.getContext().removeCloseHook(new HandlerEntry<>(address, holder.getHandler()));
break;
}
}
}
}
return lastHolder;
}

其真正的unregister逻辑位于removeLocalRegistration方法中。首先需要从handlerMap中获取地址对应的Handlers实例handlers,如果handlers不为空,为了防止并发问题,Vert.x需要对其加锁后再进行操作。Vert.x需要遍历handlers中的列表,遇到与传入的HandlerRegistration相匹配的HandlerHolder就将其从列表中移除,然后调用对应holdersetRemoved方法标记其为已注销并记录Metrics数据(handlerUnregistered)。如果移除此HandlerHolderhandlers没有任何注册的Handler了,就将该地址对应的Handlers实例从handlerMap中移除并保存刚刚移除的HandlerHolder。另外,由于已经将此consumer注销,在undeploy verticle的时候不需要再进行unregister,因此这里还要将之前注册到context的hook移除。

调用完removeLocalRegistration方法以后,Vert.x会调用另一个版本的removeRegistration方法,调用completionHandler(用户在第二个版本的unregister方法中传入的处理函数)对应的逻辑,其它的参数都没什么用。。。

这就是MessageConsumer注销的逻辑实现。下面就到了本文的另一重头戏了 —— 发送消息相关的函数sendpublish

send & publish

sendpublish的逻辑相近,只不过一个是发送至目标地址的某一消费者,一个是发布至目标地址的所有消费者。Vert.x使用一个标志位send来代表是否为点对点发送模式。

几个版本的sendpublish最终都归结于生成消息对象然后调用sendOrPubInternal方法执行逻辑,只不过send标志位不同:

1
2
3
4
5
6
7
8
9
10
11
@Override
public <T> EventBus send(String address, Object message, DeliveryOptions options, Handler<AsyncResult<Message<T>>> replyHandler) {
sendOrPubInternal(createMessage(true, address, options.getHeaders(), message, options.getCodecName()), options, replyHandler);
return this;
}
@Override
public EventBus publish(String address, Object message, DeliveryOptions options) {
sendOrPubInternal(createMessage(false, address, options.getHeaders(), message, options.getCodecName()), options, null);
return this;
}

两个方法中都是通过createMessage方法来生成对应的消息对象的:

1
2
3
4
5
6
7
protected MessageImpl createMessage(boolean send, String address, MultiMap headers, Object body, String codecName) {
Objects.requireNonNull(address, "no null address accepted");
MessageCodec codec = codecManager.lookupCodec(body, codecName);
@SuppressWarnings("unchecked")
MessageImpl msg = new MessageImpl(address, null, headers, body, codec, send, this);
return msg;
}

createMessage方法接受5个参数:send即上面提到的标志位,address为发送目标地址,headers为设置的header,body代表发送的对象,codecName代表对应的Codec(消息编码解码器)名称。createMessage方法首先会确保地址不为空,然后通过codecManager来获取对应的MessageCodec。如果没有提供Codec(即codecName为空),那么codecManager会根据发送对象body的类型来提供内置的Codec实现(具体逻辑请见此处)。准备好MessageCodec后,createMessage方法就会创建一个MessageImpl实例并且返回。

这里我们还需要了解一下MessageImpl的构造函数:

1
2
3
4
5
6
7
8
9
10
11
public MessageImpl(String address, String replyAddress, MultiMap headers, U sentBody,
MessageCodec<U, V> messageCodec,
boolean send, EventBusImpl bus) {
this.messageCodec = messageCodec; // Codec
this.address = address; // 发送目标地址
this.replyAddress = replyAddress; // 回复地址
this.headers = headers; // header
this.sentBody = sentBody; // 发送的对象
this.send = send; // 是否为点对点模式
this.bus = bus; // 相关的Event Bus实例
}

createMessage方法并没有设置回复地址replyAddress。如果用户指定了replyHandler的话,后边sendOrPubInternal方法会对此消息实体进行加工,设置replyAddress并生成回复逻辑对应的HandlerRegistration

我们看一下sendOrPubInternal方法的源码:

1
2
3
4
5
6
7
private <T> void sendOrPubInternal(MessageImpl message, DeliveryOptions options,
Handler<AsyncResult<Message<T>>> replyHandler) {
checkStarted();
HandlerRegistration<T> replyHandlerRegistration = createReplyHandlerRegistration(message, options, replyHandler);
SendContextImpl<T> sendContext = new SendContextImpl<>(message, options, replyHandlerRegistration);
sendContext.next();
}

它接受三个参数:要发送的消息message,发送配置选项options以及回复处理函数replyHandler。首先sendOrPubInternal方法要检查Event Bus是否已启动,接着如果绑定了回复处理函数,Vert.x就会调用createReplyHandlerRegistration方法给消息实体message包装上回复地址,并且生成对应的reply consumer。接着Vert.x创建了一个包装消息的SendContextImpl实例并调用了其next方法。

我们一步一步来解释。首先是createReplyHandlerRegistration方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private <T> HandlerRegistration<T> createReplyHandlerRegistration(MessageImpl message, DeliveryOptions options, Handler<AsyncResult<Message<T>>> replyHandler) {
if (replyHandler != null) {
long timeout = options.getSendTimeout();
String replyAddress = generateReplyAddress();
message.setReplyAddress(replyAddress);
Handler<Message<T>> simpleReplyHandler = convertHandler(replyHandler);
HandlerRegistration<T> registration =
new HandlerRegistration<>(vertx, metrics, this, replyAddress, message.address, true, replyHandler, timeout);
registration.handler(simpleReplyHandler);
return registration;
} else {
return null;
}
}

createReplyHandlerRegistration方法首先检查传入的replyHandler是否为空(是否绑定了replyHandler,回复处理函数),如果为空则代表不需要处理回复,直接返回null;若replyHandler不为空,createReplyHandlerRegistration方法就会从配置中获取reply的最大超时时长(默认30s),然后调用generateReplyAddress方法生成对应的回复地址replyAddress

1
2
3
4
5
private final AtomicLong replySequence = new AtomicLong(0);
protected String generateReplyAddress() {
return Long.toString(replySequence.incrementAndGet());
}

生成回复地址的逻辑有点简单。。。。EventBusImpl实例中维护着一个AtomicLong类型的replySequence成员变量代表对应的回复地址。每次生成的时候都会使其自增,然后转化为String。也就是说生成的replyAddress都类似于”1”、”5”这样,而不是我们想象中的直接回复至sender的地址。。。

生成完毕以后,createReplyHandlerRegistration方法会将生成的replyAddress设定到消息对象message中。接着Vert.x会通过convertHandler方法对replyHandler进行包装处理并生成类型简化为Handler<Message<T>>simpleReplyHandler,它用于绑定至后面创建的reply consumer上。接着Vert.x会创建对应的reply consumer。关于reply操作的实现,我们后边会详细讲述。下面Vert.x就通过handler方法将生成的回复处理函数simpleReplyHandler绑定至创建好的reply consumer中,其底层实现我们之前已经分析过了,这里就不再赘述。最后此方法返回生成的registration,即对应的reply consumer。注意这个reply consumer是一次性的,也就是说Vert.x会在其接收到回复或超时的时候自动对其进行注销。

OK,现在回到sendOrPubInternal方法中来。下面Vert.x会创建一个SendContextImpl实例并调用其next方法。SendContextImpl类实现了SendContext接口,它相当于一个消息的封装体,并且可以与Event Bus中的interceptors(拦截器)结合使用。

SendContext接口定义了三个方法:

  • message: 获取当前SendContext包装的消息实体
  • next: 调用下一个消息拦截器
  • send: 代表消息的发送模式是否为点对点模式

在Event Bus中,消息拦截器本质上是一个Handler<SendContext>类型的处理函数。Event Bus内部存储着一个interceptors列表用于存储绑定的消息拦截器。我们可以通过addInterceptorremoveInterceptor方法进行消息拦截器的添加和删除操作。如果要进行链式拦截,则在每个拦截器中都应该调用对应SendContextnext方法,比如:

1
2
3
4
eventBus.addInterceptor(sc -> {
// 一些处理逻辑
sc.next(); // 调用下一个拦截器
});

我们来看一下SendContextImpl类中next方法的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
protected class SendContextImpl<T> implements SendContext<T> {
public final MessageImpl message;
public final DeliveryOptions options;
public final HandlerRegistration<T> handlerRegistration;
public final Iterator<Handler<SendContext>> iter;
public SendContextImpl(MessageImpl message, DeliveryOptions options, HandlerRegistration<T> handlerRegistration) {
this.message = message;
this.options = options;
this.handlerRegistration = handlerRegistration;
this.iter = interceptors.iterator();
}
@Override
public Message<T> message() {
return message;
}
@Override
public void next() {
if (iter.hasNext()) {
Handler<SendContext> handler = iter.next();
try {
handler.handle(this);
} catch (Throwable t) {
log.error("Failure in interceptor", t);
}
} else {
sendOrPub(this);
}
}
@Override
public boolean send() {
return message.send();
}
}

我们可以看到,SendContextImpl类中维护了一个拦截器列表对应的迭代器。每次调用next方法时,如果迭代器中存在拦截器,就将下个拦截器取出并进行相关调用。如果迭代器为空,则代表拦截器都已经调用完毕,Vert.x就会调用EventBusImpl类下的sendOrPub方法进行消息的发送操作。

sendOrPub方法仅仅在metrics模块中记录相关数据(messageSent),最后调用deliverMessageLocally(SendContextImpl<T>)方法执行消息的发送逻辑:

1
2
3
4
5
6
7
8
9
10
protected <T> void deliverMessageLocally(SendContextImpl<T> sendContext) {
if (!deliverMessageLocally(sendContext.message)) {
// no handlers
metrics.replyFailure(sendContext.message.address, ReplyFailure.NO_HANDLERS);
if (sendContext.handlerRegistration != null) {
sendContext.handlerRegistration.sendAsyncResultFailure(ReplyFailure.NO_HANDLERS, "No handlers for address "
+ sendContext.message.address);
}
}
}

这里面又套了一层。。。它最后其实是调用了deliverMessageLocally(MessageImpl)方法。此方法返回值代表发送消息的目标地址是否注册有MessageConsumer,如果没有(false)则记录错误并调用sendContext中保存的回复处理函数处理错误(如果绑定了replyHandler的话)。

deliverMessageLocally(MessageImpl)方法是真正区分sendpublish的地方,我们来看一下其实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
protected <T> boolean deliverMessageLocally(MessageImpl msg) {
msg.setBus(this);
Handlers handlers = handlerMap.get(msg.address());
if (handlers != null) {
if (msg.send()) {
//Choose one
HandlerHolder holder = handlers.choose();
metrics.messageReceived(msg.address(), !msg.send(), isMessageLocal(msg), holder != null ? 1 : 0);
if (holder != null) {
deliverToHandler(msg, holder);
}
} else {
// Publish
metrics.messageReceived(msg.address(), !msg.send(), isMessageLocal(msg), handlers.list.size());
for (HandlerHolder holder: handlers.list) {
deliverToHandler(msg, holder);
}
}
return true;
} else {
metrics.messageReceived(msg.address(), !msg.send(), isMessageLocal(msg), 0);
return false;
}
}

首先Vert.x需要从handlerMap中获取目标地址对应的处理函数集合handlers。接着,如果handlers不为空的话,Vert.x就会判断消息实体的send标志位。如果send标志位为true则代表以点对点模式发送,Vert.x就会通过handlerschoose方法(之前提到过),按照轮询算法来获取其中的某一个HandlerHolder。获取到HandlerHolder之后,Vert.x会通过deliverToHandler方法将消息分发至HandlerHolder中进行处理;如果send标志位为false则代表向所有消费者发布消息,Vert.x就会对handlers中的每一个HandlerHolder依次调用deliverToHandler方法,以便将消息分发至所有注册到此地址的Handler中进行处理。

消息处理的真正逻辑就在deliverToHandler方法中。我们来看一下它的源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private <T> void deliverToHandler(MessageImpl msg, HandlerHolder<T> holder) {
// Each handler gets a fresh copy
@SuppressWarnings("unchecked")
Message<T> copied = msg.copyBeforeReceive();
if (metrics.isEnabled()) {
metrics.scheduleMessage(holder.getHandler().getMetric(), msg.isLocal());
}
holder.getContext().runOnContext((v) -> {
// Need to check handler is still there - the handler might have been removed after the message were sent but
// before it was received
try {
if (!holder.isRemoved()) {
holder.getHandler().handle(copied);
}
} finally {
if (holder.isReplyHandler()) {
holder.getHandler().unregister();
}
}
});
}

首先deliverToHandler方法会复制一份要发送的消息,然后deliverToHandler方法会调用metricsscheduleMessage方法记录对应的Metrics信息(计划对消息进行处理。此函数修复了Issue 1480)。接着deliverToHandler方法会从传入的HandlerHolder中获取对应的Vert.x Context,然后调用runOnContext方法以便可以让消息处理逻辑在Vert.x Context中执行。为防止对应的handler在处理之前被移除,这里还需要检查一下holderisRemoved属性。如果没有移除,那么就从holder中获取对应的handler并调用其handle方法执行消息的处理逻辑。注意这里获取的handler实际上是一个HandlerRegistration。前面提到过HandlerRegistration类同时实现了MessageConsumer接口和Handler接口,因此它兼具这两个接口所期望的功能。另外,之前我们提到过Vert.x会自动注销接收过回复的reply consumer,其逻辑就在这个finally块中。Vert.x会检查holder中的handler是否为reply handler(reply consumer),如果是的话就调用其unregister方法将其注销,来确保reply consumer为一次性的。

之前我们提到过MessageConsumer继承了ReadStream接口,因此HandlerRegistration需要实现flow control(back-pressure)的相关逻辑。那么如何实现呢?我们看到,HandlerRegistration类中有一个paused标志位代表是否还继续处理消息。ReadStream接口中定义了两个函数用于控制stream的通断:当处理速度小于读取速度(发生拥塞)的时候我们可以通过pause方法暂停消息的传递,将积压的消息暂存于内部的消息队列(缓冲区)pending中;当相对速度正常的时候,我们可以通过resume方法恢复消息的传递和处理。

我们看一下HandlerRegistrationhandle方法的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Override
public void handle(Message<T> message) {
Handler<Message<T>> theHandler;
synchronized (this) {
if (paused) {
if (pending.size() < maxBufferedMessages) {
pending.add(message);
} else {
if (discardHandler != null) {
discardHandler.handle(message);
} else {
log.warn("Discarding message as more than " + maxBufferedMessages + " buffered in paused consumer");
}
}
return;
} else {
if (pending.size() > 0) {
pending.add(message);
message = pending.poll();
}
theHandler = handler;
}
}
deliver(theHandler, message);
}

果然。。。handle方法在处理消息的基础上实现了拥塞控制的功能。为了防止资源争用,需要对自身进行加锁;首先handle方法会判断当前的consumer是否为paused状态,如果为paused状态,handle方法会检查当前缓冲区大小是否已经超过给定的最大缓冲区大小maxBufferedMessages,如果没超过,就将收到的消息push到缓冲区中;如果大于或等于阈值,Vert.x就需要丢弃超出的那部分消息。如果当前的consumer为正常状态,则如果缓冲区不为空,就将收到的消息push到缓冲区中并从缓冲区中pull队列首端的消息,然后调用deliver方法执行真正的消息处理逻辑。注意这里是在锁之外执行deliver方法的,这是为了保证在multithreaded worker context下可以并发传递消息(见Bug 473714 )。由于multithreaded worker context允许在不同线程并发执行逻辑(见官方文档),如果将deliver方法置于synchronized块之内,其他线程必须等待当前锁被释放才能进行消息的传递逻辑,因而不能做到“delivery concurrently”。

deliver方法是真正执行“消息处理”逻辑的地方:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private void deliver(Handler<Message<T>> theHandler, Message<T> message) {
checkNextTick();
boolean local = true;
if (message instanceof ClusteredMessage) {
// A bit hacky
ClusteredMessage cmsg = (ClusteredMessage)message;
if (cmsg.isFromWire()) {
local = false;
}
}
String creditsAddress = message.headers().get(MessageProducerImpl.CREDIT_ADDRESS_HEADER_NAME);
if (creditsAddress != null) {
eventBus.send(creditsAddress, 1);
}
try {
metrics.beginHandleMessage(metric, local);
theHandler.handle(message);
metrics.endHandleMessage(metric, null);
} catch (Exception e) {
log.error("Failed to handleMessage", e);
metrics.endHandleMessage(metric, e);
throw e;
}
}

首先Vert.x会调用checkNextTick方法来检查消息队列(缓冲区)中是否存在更多的消息等待被处理,如果有的话就取出队列首端的消息并调用deliver方法将其传递给handler进行处理。这里仍需要注意并发问题,相关实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private synchronized void checkNextTick() {
if (!pending.isEmpty()) {
handlerContext.runOnContext(v -> {
Message<T> message;
Handler<Message<T>> theHandler;
synchronized (HandlerRegistration.this) {
if (paused || (message = pending.poll()) == null) {
return;
}
theHandler = handler;
}
deliver(theHandler, message);
});
}
}

检查完消息队列以后,Vert.x会接着根据message判断消息是否仅在本地进行处理并给local标志位赋值,local标志位将在记录Metrics数据时用到。

接下来我们看到Vert.x从消息的headers中获取了一个地址creditsAddress,如果creditsAddress存在就向此地址发送一条消息,body为1。那么这个creditsAddress又是啥呢?其实,它与flow control有关,我们会在下面详细分析。发送完credit消息以后,接下来就到了调用handler处理消息的时刻了。在处理消息之前需要调用metricsbeginHandleMessage方法记录消息开始处理的metrics数据,在处理完消息以后需要调用endHandleMessage方法记录消息处理结束的metrics数据。

嗯。。。到此为止,消息的发送和处理过程就已经一目了然了。下面我们讲一讲之前代码中出现的creditsAddress到底是啥玩意~

MessageProducer

之前我们提到过,Vert.x定义了两个接口作为 flow control aware object 的规范:WriteStream以及ReadStream。对于ReadStream我们已经不再陌生了,MessageConsumer就继承了它;那么大家应该可以想象到,有MessageConsumer就必有MessageProducer。不错,Vert.x中的MessageProducer接口对应某个address上的消息生产者,同时它继承了WriteStream接口,因此MessageProducer的实现类MessageProducerImpl同样具有flow control的能力。我们可以把MessageProducer看做是一个具有flow control功能的增强版的EventBus。我们可以通过EventBus接口的publisher方法创建一个MessageProducer

MessageProducer有了初步了解之后,我们就可以解释前面deliver方法中的creditsAddress了。MessageProducer接口的实现类 —— MessageProducerImpl类的流量控制功能是基于credit的,其内部会维护一个credit值代表“发送消息的能力”,其默认值等于DEFAULT_WRITE_QUEUE_MAX_SIZE

1
2
private int maxSize = DEFAULT_WRITE_QUEUE_MAX_SIZE;
private int credits = DEFAULT_WRITE_QUEUE_MAX_SIZE;

在采用点对点模式发送消息的时候,MessageProducer底层会调用doSend方法进行消息的发送。发送依然利用Event Bus的send方法,只不过doSend方法中添加了flow control的相关逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
private synchronized <R> void doSend(T data, Handler<AsyncResult<Message<R>>> replyHandler) {
if (credits > 0) {
credits--;
if (replyHandler == null) {
bus.send(address, data, options);
} else {
bus.send(address, data, options, replyHandler);
}
} else {
pending.add(data);
}
}

MessageConsumer类似,MessageProducer内部同样保存着一个消息队列(缓冲区)用于暂存堆积的消息。当credits大于0的时候代表可以发送消息(没有出现拥塞),Vert.x就会调用Event Bus的send方法进行消息的发送,同时credits要减1;如果credits小于等于0,则代表此时消息发送的速度太快,出现了拥塞,需要暂缓发送,因此将要发送的对象暂存于缓冲区中。大家可能会问,credits值不断减小,那么恢复消息发送能力(增大credits)的逻辑在哪呢?这就要提到creditsAddress了。我们看一下MessageProducerImpl类的构造函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public MessageProducerImpl(Vertx vertx, String address, boolean send, DeliveryOptions options) {
this.vertx = vertx;
this.bus = vertx.eventBus();
this.address = address;
this.send = send;
this.options = options;
if (send) {
String creditAddress = UUID.randomUUID().toString() + "-credit";
creditConsumer = bus.consumer(creditAddress, msg -> {
doReceiveCredit(msg.body());
});
options.addHeader(CREDIT_ADDRESS_HEADER_NAME, creditAddress);
} else {
creditConsumer = null;
}
}

MessageProducerImpl的构造函数中生成了一个creditAddress,然后给该地址绑定了一个Handler,当收到消息时调用doReceiveCredit方法执行解除拥塞,恢复消息发送的逻辑。MessageProducerImpl会将此MessageConsumer保存,以便在关闭消息生产者流的时候将其注销。接着构造函数会往optionsheaders中添加一条记录,保存对应的creditAddress,这也就是上面我们在deliver函数中获取的creditAddress

1
2
3
4
5
// 位于HandlerRegistration类的deliver函数中
String creditsAddress = message.headers().get(MessageProducerImpl.CREDIT_ADDRESS_HEADER_NAME);
if (creditsAddress != null) {
eventBus.send(creditsAddress, 1);
}

这样,发送消息到creditsAddress的逻辑也就好理解了。由于deliver函数的逻辑就是处理消息,因此这里向creditsAddress发送一个 1 其实就是将对应的credits值加1。恢复消息发送的逻辑位于MessageProducerImpl类的doReceiveCredit方法中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private synchronized void doReceiveCredit(int credit) {
credits += credit;
while (credits > 0) {
T data = pending.poll();
if (data == null) {
break;
} else {
credits--;
bus.send(address, data, options);
}
}
final Handler<Void> theDrainHandler = drainHandler;
if (theDrainHandler != null && credits >= maxSize / 2) {
this.drainHandler = null;
vertx.runOnContext(v -> theDrainHandler.handle(null));
}
}

逻辑一目了然。首先给credits加上发送过来的值(正常情况下为1),然后恢复发送能力,将缓冲区的数据依次取出、发送然后减小credits。同时如果MessageProducer绑定了drainHandler(消息流不拥塞的时候调用的逻辑,详见官方文档),并且MessageProducer发送的消息不再拥塞(credits >= maxSize / 2),那么就在Vert.x Context中执行drainHandler中的逻辑。

怎么样,体会到Vert.x中flow control的强大之处了吧!官方文档中MessageProducer的篇幅几乎没有,只在介绍WriteStream的时候提了提,因此这部分也可以作为MessageProducer的参考。

reply

最后就是消息的回复逻辑 —— reply方法了。reply方法的实现位于MessageImpl类中,最终调用的是reply(Object, DeliveryOptions, Handler<AsyncResult<Message<R>>>)这个版本:

1
2
3
4
5
6
@Override
public <R> void reply(Object message, DeliveryOptions options, Handler<AsyncResult<Message<R>>> replyHandler) {
if (replyAddress != null) {
sendReply(bus.createMessage(true, replyAddress, options.getHeaders(), message, options.getCodecName()), options, replyHandler);
}
}

这里reply方法同样调用EventBuscreateMessage方法创建要回复的消息实体,传入的replyAddress即为之前讲过的生成的非常简单的回复地址。然后再将消息实体、配置以及对应的replyHandler(如果有的话)传入sendReply方法进行消息的回复。最后其实是调用了Event Bus中的四个参数的sendReply方法,它的逻辑与之前讲过的sendOrPubInternal非常相似:

1
2
3
4
5
6
7
8
9
protected <T> void sendReply(MessageImpl replyMessage, MessageImpl replierMessage, DeliveryOptions options,
Handler<AsyncResult<Message<T>>> replyHandler) {
if (replyMessage.address() == null) {
throw new IllegalStateException("address not specified");
} else {
HandlerRegistration<T> replyHandlerRegistration = createReplyHandlerRegistration(replyMessage, options, replyHandler);
new ReplySendContextImpl<>(replyMessage, options, replyHandlerRegistration, replierMessage).next();
}
}

参数中replyMessage代表回复消息实体,replierMessage则代表回复者自身的消息实体(sender)。

如果地址为空则抛出异常;如果地址不为空,则先调用createReplyHandlerRegistration方法创建对应的replyHandlerRegistrationcreateReplyHandlerRegistration方法的实现之前已经讲过了。注意这里的createReplyHandlerRegistration其实对应的是此replier的回复,因为Vert.x中的 Request-Response 消息模型不限制相互回复(通信)的次数。当然如果没有指定此replier的回复的replyHandler,那么此处的replyHandlerRegistration就为空。最后sendReply方法会创建一个ReplySendContextImpl并调用其next方法。

ReplySendContextImpl类同样是SendContext接口的一个实现(继承了SendContextImpl类)。ReplySendContextImpl比起其父类就多保存了一个replierMessagenext方法的逻辑与父类逻辑非常相似,只不过将回复的逻辑替换成了另一个版本的sendReply方法:

1
2
3
4
5
6
7
8
9
@Override
public void next() {
if (iter.hasNext()) {
Handler<SendContext> handler = iter.next();
handler.handle(this);
} else {
sendReply(this, replierMessage);
}
}

然而。。。sendReply方法并没有用到传入的replierMessage,所以这里最终还是调用了sendOrPub方法(尼玛,封装的ReplySendContextImpl貌似并没有什么卵用,可能为以后的扩展考虑?)。。。之后的逻辑我们都已经分析过了。

这里再强调一点。当我们发送消息同时指定replyHandler的时候,其内部为reply创建的reply consumer(类型为HandlerRegistration)指定了timeout。这个定时器从HandlerRegistration创建的时候就开始计时了。我们回顾一下:

1
2
3
4
5
6
if (timeout != -1) {
timeoutID = vertx.setTimer(timeout, tid -> {
metrics.replyFailure(address, ReplyFailure.TIMEOUT);
sendAsyncResultFailure(ReplyFailure.TIMEOUT, "Timed out after waiting " + timeout + "(ms) for a reply. address: " + address);
});
}

计时器会在超时的时候记录错误并强制注销当前consumer。由于reply consumer是一次性的,当收到reply的时候,Vert.x会自动对reply consumer调用unregister方法对其进行注销(实现位于EventBusImpl#deliverToHandler方法中),而在注销逻辑中会关闭定时器(参见前面对doUnregister方法的解析);如果超时,那么计时器就会触发,Vert.x会调用sendAsyncResultFailure方法注销当前reply consumer并处理错误。

synchronized的性能问题

大家可能看到为了防止race condition,Vert.x底层大量使用了synchronized关键字(重量级锁)。这会不会影响性能呢?其实,如果开发者遵循Vert.x的线程模型和开发规范(使用Verticle)的话,有些地方的synchronized对应的锁会被优化为 偏向锁轻量级锁(因为通常都是同一个Event Loop线程获取对应的锁),这样性能总体开销不会太大。当然如果使用Multi-threaded worker verticles就要格外关注性能问题了。。。

总结

我们来简略地总结一下Event Bus的工作原理。当我们调用consumer绑定一个MessageConsumer时,Vert.x会将它保存至Event Bus实例内部的Map中;当我们通过sendpublish向对应的地址发送消息的时候,Vert.x会遍历Event Bus中存储consumer的Map,获取与地址相对应的consumer集合,然后根据相应的策略传递并处理消息(send通过轮询策略获取任意一个consumer并将消息传递至consumer中,publish则会将消息传递至所有注册到对应地址的consumer中)。同时,MessageConsumerMessageProducer这两个接口的实现都具有flow control功能,因此它们也可以用在Pump中。

Event Bus是Vert.x中最为重要的一部分之一,探索Event Bus的源码可以加深我们对Event Bus工作原理的理解。作为开发者,只会使用框架是不够的,能够理解内部的实现原理和精华,并对其进行改进才是更为重要的。本篇文章分析的是Local模式下的Event Bus,下篇文章我们将来探索一下生产环境中更常用的 Clustered Event Bus 的实现原理,敬请期待!