Objectives

In this week, I’m planning to do the following stuff:

  • Work on the Vert.x Blueprint - Kue (Initial implementation)
  • Learn more about event-driven and message system

Knowledge got

This week I’ve learned a lot!

  • Clustered Verticles and how to interact with each other
  • Vert.x Event Bus (send/recv and pub/sub)
  • Vert.x Service Proxy
  • More understanding about Redis(sorted set, list, pub/sub, transaction, etc)
  • More understanding about asynchronous and reactive pattern(Handler / Future)

Issues

How to implement priority?

I imitate Kue in Node.js: use sorted sets in Redis and mark the priority as the score(weight). Use zadd to add a certain id of job and when necessary, use zpop(implemented with transaction) to get a job with higher priority.

How to emit and receive events?

In Node.js, we could use EventEmitter to emit and listen to events. And in Vert.x, we could make full use of EventBus, which supports three kinds of pattern. For example, in Node.js we may write this:

1
2
3
self.on('complete', fn);
// ......
self.emit('complete', res);

And in Vert.x we could write:

1
2
3
4
5
6
7
// listen to an address
eventBus.consumer(Kue.getHandlerAddress("complete", this.type), message -> {
completeHandler.handle(new Job((JsonObject) message.body()));
});

// send message to an address
eventBus.send(Kue.getHandlerAddress("complete", type), job.toJson());

Achievement

Refined the design of Vert.x Kue

  • Kue: like the Queue in Kue. We could createJob and process the job
  • KueVerticle: a verticle where the KueService is registered
  • KueWorker: a worker verticle that processes the job
  • KueService: create KueWorker and deploy it
  • Job: the job entity, contains numerous logic about job

The backend of Vert.x Kue is Redis.

Basic usage (design) of Vert.x Kue:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// write this and then run with Vert.x Launcher in clustered mode
public class ExampleProcessVerticle extends AbstractVerticle {

@Override
public void start() throws Exception {
// must first create kue
Kue kue = Kue.createQueue(vertx, config());

Job job0 = kue.createJob("video", new JsonObject().put("id", 3001))
.priority(Priority.HIGH)
.onComplete(e -> {
System.out.println("Video result: " + e.getResult());
System.out.println("Haha");
});

job0.save();

kue.process("video", 1, r -> {
if (r.succeeded()) {
Job job = r.result();
// consume 2 seconds
vertx.setTimer(2000, l -> {
job.progress(100, 100);
System.out.println("Video id: " + job.getId());
});
} else {
r.cause().printStackTrace();
}
});
}
}

Initial concept implementation of Vert.x Kue

See here: sczyh30/vertx-blueprint-job-queue