In this blog post, I’ll give a brief introduction to illustrate how Vert.x Kue works.

In the workflow, first we deploy the KueVerticle in clustered mode and register the JobService on the event bus. The JobService consists of various logic of Job. After that, we could then deploy KueHttpVerticle to provide REST API. Most of the apis calls Future-based methods in Kue. Finally we could deploy our job processing verticle. The verticle usually consists of three parts:

  • create Kue instance

When we create Kue instance, the inner JobService proxy and redis client will be created. Also, the static Vertx field in Job class will also be initilized.

  • create and save Job

Here we could set job properties and subscribe events from the job address(via event bus).

  • process the Job

When we call the process or processBlocking method, Vert.x Kue will create and then deploy some KueWorkers, which is a kind of verticle(or worker verticle, if calling processBlocking).

In KueWorker, we first get Job from Redis backend (zpop operation):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private void prepareAndStart() {
this.getJobFromBackend().setHandler(jr -> {
if (jr.succeeded()) {
if (jr.result().isPresent()) {
this.job = jr.result().get();
process();
} else {
throw new IllegalStateException("job not exist");
}
} else {
jr.cause().printStackTrace();
}
});
}

If the KueWorker successfully gets the job, it will do process procedure. First we set started_at flag on the job and then save it to the backend. Then we emit the job start event on the event bus(both job address and worker address). Next, we will process the job using the given handler. Simultaneously, we consume done and done_fail events on the job-specific address. If done event received, the job should be completed and we emit complete event. If done_fail event received, the job should be failed so we first do a failed attempt. If attempt failed, then send error event on worker address. Else send failed or failed_attempt event depending on the attempt status.

There are three kinds of address:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
* Handler address with certain job on event bus
* Format: vertx.kue.handler.job.{handlerType}.{addressId}.{jobType}
*
* @return corresponding address
*/

public static String getCertainJobAddress(String handlerType, Job job) {
return "vertx.kue.handler.job." + handlerType + "." + job.getAddress_id() + "." + job.getType();
}

/**
* Global worker address with specific event type
*/

public static String workerAddress(String eventType) {
return "vertx.kue.handler.workers." + eventType;
}

/**
* Worker address with specific event type and job-uuid on event bus
* Format: vertx.kue.handler.workers.{eventType}
*
* @return corresponding address
*/

public static String workerAddress(String eventType, Job job) {
return "vertx.kue.handler.workers." + eventType + "." + job.getAddress_id();
}

Diagram