Vert.x 技术内幕 | executeBlocking 实现原理

引入

大家都知道,Vert.x中的executeBlocking方法用于执行阻塞任务,并且有两种模式:有序执行和无序执行。下面我们来看两段代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
vertx.setPeriodic(1000, t -> {
vertx.executeBlocking(future -> {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName());
future.complete();
}, r -> {});
});
vertx.setPeriodic(1000, t -> {
vertx.executeBlocking(future -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName());
future.complete();
}, r -> {});
});

我们思考一下,每段代码每次执行的时候使用的线程相同么?正常情况下大家都知道executeBlocking底层使用了Worker线程池,因此貌似两种情况没什么区别,都是轮询Worker线程池,每次可能用不同的Worker线程。但是我们测一下:

第一段代码:

1
2
3
4
5
vert.x-worker-thread-0
vert.x-worker-thread-1
vert.x-worker-thread-2
vert.x-worker-thread-3
vert.x-worker-thread-4

第二段代码:

1
2
3
4
5
vert.x-worker-thread-0
vert.x-worker-thread-0
vert.x-worker-thread-0
vert.x-worker-thread-0
vert.x-worker-thread-0

额。。。两段代码每次执行的线程居然有差异?第二次为什么每次都用相同的Worker线程?其实,大家可能忽略了一点:executeBlocking方法默认顺序执行提交的阻塞任务。今天我们就来探究一下executeBlocking内部的实现。

Worker线程池

我们来回顾一下Vert.x底层的Worker线程池,它在创建VertxImpl实例的时候进行初始化:

1
2
3
4
5
ExecutorService workerExec = Executors.newFixedThreadPool(options.getWorkerPoolSize(),
new VertxThreadFactory("vert.x-worker-thread-", checker, true, options.getMaxWorkerExecuteTime()));
PoolMetrics workerPoolMetrics = isMetricsEnabled() ? metrics.createMetrics(workerExec, "worker", "vert.x-worker-thread", options.getWorkerPoolSize()) : null;
workerPool = new WorkerPool(workerExec, workerPoolMetrics);

可以看到底层的Worker线程池本质上是一种FixedThreadPool,里面的线程由VertxThreadFactory控制生成,对应的线程类型为VertxThread。Vert.x内部用WorkerPool类对线程池以及线程池相关的Metrics类进行了封装。

阻塞任务在哪里执行?

有了Worker线程池的基础,我们来看一下Vertx实例中的executeBlocking方法,它的过程很简单:获取当前的Vert.x Context(没有就创建),然后委托调用Context里的executeBlocking方法:

1
2
3
4
5
6
7
8
9
10
11
12
@Override
public <T> void executeBlocking(Handler<Future<T>> blockingCodeHandler, boolean ordered,
Handler<AsyncResult<T>> asyncResultHandler) {
ContextImpl context = getOrCreateContext();
context.executeBlocking(blockingCodeHandler, ordered, asyncResultHandler);
}
@Override
public <T> void executeBlocking(Handler<Future<T>> blockingCodeHandler,
Handler<AsyncResult<T>> asyncResultHandler) {
executeBlocking(blockingCodeHandler, true, asyncResultHandler);
}

在此方法中可以看到,ordered标志位默认为true,即默认按提交的次序执行阻塞任务。

我们再来看一下ContextImpl类中的executeBlocking方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
@Override
public <T> void executeBlocking(Handler<Future<T>> blockingCodeHandler, boolean ordered, Handler<AsyncResult<T>> resultHandler) {
executeBlocking(null, blockingCodeHandler, resultHandler, ordered ? workerExec : workerPool.executor(), workerPool.metrics());
}
<T> void executeBlocking(Action<T> action, Handler<Future<T>> blockingCodeHandler,
Handler<AsyncResult<T>> resultHandler,
Executor exec, PoolMetrics metrics) {
Object queueMetric = metrics != null ? metrics.submitted() : null;
try {
exec.execute(() -> {
Object execMetric = null;
if (metrics != null) {
execMetric = metrics.begin(queueMetric);
}
Future<T> res = Future.future();
try {
if (blockingCodeHandler != null) {
ContextImpl.setContext(this);
blockingCodeHandler.handle(res);
} else {
T result = action.perform();
res.complete(result);
}
} catch (Throwable e) {
res.fail(e);
}
if (metrics != null) {
metrics.end(execMetric, res.succeeded());
}
if (resultHandler != null) {
runOnContext(v -> res.setHandler(resultHandler));
}
});
} catch (RejectedExecutionException ignore) {
// Pool is already shut down
if (metrics != null) {
metrics.rejected(queueMetric);
}
}
}

它调用了另一个具体版本的executeBlocking方法,其中第四个参数即为要执行阻塞任务的线程池。如果要有序执行(ordered为true),底层就使用context实例里的workerExec线程池;如果无序执行,就调用workerPoolexecutor方法获取另一种线程池。

看到这里,我们大致已经想到了,有序执行和无序执行两种模式使用不同的线程池,因此底层实现肯定有差异。我们来看一下前面提到的两个线程池,它们都是ContextImpl类的成员变量:

1
2
protected final WorkerPool workerPool;
protected final Executor workerExec;

在通过Vertx实例创建Context的时候,这几个变量会被初始化,其来源就是之前我们看过的VertxImpl实例中的Worker线程池。看一下ContextImpl类的构造函数就一目了然了:

1
2
this.workerPool = workerPool;
this.workerExec = workerPool.createOrderedExecutor();

嗯。。。有序执行对应的线程池通过workerPoolcreateOrderedExecutor方法获得,而无序执行对应的线程池通过workerPoolexecutor方法获得。因此,WorkerPool类是一个关键点,我们稍后就看一下其实现。

注意Vert.x规定,blockingCodeHandler中的逻辑(即阻塞任务)在Worker线程内执行,而resultHandler内的逻辑(结果处理)需要在Vert.x Conext中执行,因此前面需要预先设置当前使用的Worker线程的Contextthis以便后面调用runOnContext方法执行结果处理逻辑。

下面就来看一下有序执行和无序执行这两种线程池的具体区别。

无序执行

我们看一下WorkerPool类的源码中获取无序执行线程池的逻辑:

1
2
3
ExecutorService executor() {
return pool;
}

可以看到executor方法直接返回了内部的pool线程池,而pool线程池其实就是VertxImpl中的workerExec线程池:

1
workerPool = new WorkerPool(workerExec, workerPoolMetrics);

OK!如果大家熟悉并发的话,大家应该对无序执行对应的线程池 —— Worker线程池的行为非常清楚了。它属于一种FixedThreadPool,底层通过阻塞队列LinkedBlockingQueue实现。底层通过轮询算法获取Worker线程执行任务。

有序执行

下面是时候看有序执行对应的逻辑了:

1
2
3
4
5
6
7
8
9
10
11
12
13
private final OrderedExecutorFactory orderedFact;
private final ExecutorService pool;
private final PoolMetrics metrics;
public WorkerPool(ExecutorService pool, PoolMetrics metrics) {
this.orderedFact = new OrderedExecutorFactory(pool);
this.pool = pool;
this.metrics = metrics;
}
Executor createOrderedExecutor() {
return orderedFact.getExecutor();
}

可以看到有序执行对应的线程池是通过OrderedExecutorFactory创建的。其实,OrderedExecutorFactory类会生成真正的有序执行线程池OrderedExecutor,它其实是对Worker线程池pool的一个简单包装,仅仅添加了有序执行相关的逻辑,最后还是委托Worker线程池进行任务处理。

那么OrderedExecutor是如何实现顺序执行的呢?OrderedExecutor内部维护着一个任务队列。每当调用executeBlocking方法执行阻塞过程的时候,Vert.x会将阻塞过程包装成Runnable然后置入OrderedExecutor中的任务队列中;同时如果OrderedExecutor没有开始执行任务,就委托内部的Worker线程池执行任务:

1
2
3
4
5
6
7
8
9
public void execute(Runnable command) {
synchronized (tasks) {
tasks.add(command);
if (!running) {
running = true;
parent.execute(runner);
}
}
}

从代码中可以看出,最后委托Worker线程池执行的线程其实是又包装了一层的runner线程。runner的逻辑不难想:不断地从任务队列中取出队首的Runnable然后调用其run方法执行(相当于执行了此任务,只不过在runner对应的线程中);如果没有任务了就结束本线程。

这里就出现了一种情况:大批量提交阻塞任务的时候,线程池的状态running一直为true,此时所有的任务都积压到任务队列中,而执行所有任务的线程只有一个 —— runner对应的线程。这种情况其实很好想,因为要保证有序执行,就只能让它一个接一个地在同个线程中执行。如果在不同线程中依次执行则不好调度,如果直接并行执行则不能保证有序性。

所以,根据OrderedExecutor线程池的内部实现,只要提交任务的间隔时间小于任务执行的时间,底层其实就仅执行了一次runner,也就是说所有提交的阻塞任务都只在一个线程下跑(running标志位控制)。

这样就可以很好地解释我们一开始提出的问题了。当sleep(200), setPeriodic(1000)的时候,提交任务的间隔时间大于任务执行的时间,这样每次的runner就可以在下一个任务提交之前执行完,因此每次所用的线程会不同(轮询策略);而sleep(2000), setPeriodic(1000)的时候,提交任务的间隔时间小于任务执行的时间,底层最后都归结到一个runner中执行了,因此所有过程都是在同一个Worker线程执行的(很好想,保证有序就要串行执行)。

当然,如果不想有序执行,可以用void executeBlocking(Handler<Future<T>> blockingCodeHandler, boolean ordered, Handler<AsyncResult<T>> asyncResultHandler)这个版本的executeBlocking方法,并将ordered标志位设为false。根据上面的源码,底层会直接使用Worker线程池而不是OrderedExecutor线程池,这样就不会有上面OrderedExecutor的情况了。

文章目录
  1. 1. 引入
  2. 2. Worker线程池
  3. 3. 阻塞任务在哪里执行?
  4. 4. 无序执行
  5. 5. 有序执行