Calling a vert.x async method from a sync method
Made popular by NodeJS and ES6 asynchronous programming promises (pun intended) better throughput and resource usage by entertaining an Event Loop. In Java land vert.x implements exactly this approach and has proven its mettle, being the foundation of Quarkus
Your legacy app doesn't magically convert
When you start a new vert.x project using the App Generator, everything is asynchronous from the beginning. Snippets of synchronous code (a.k.a blocking code), you might need to maintain, can be wrapped into executeBlocking and handled in their own thread.
However when you are about to convert a synchronous application, e.g. a servlet to asynchronous and you can't finish in a sprint/session, things become interesting. The doGet
method is synchronous by nature. There are a few steps that need to be accomplished:
- Have vert.x running in its own thread. You can't start it on the main thread with its blocking operations
- Have a method that returns a vert.x Future
- Convert that into a Java CompletableFuture
Let's have a look at the moving parts:
Running vert.x in a Thread
When starting a servlet, the container calls the init()
method. There we can spin off a Thread for our vert.x instance. I'll be using an AtomicBoolean to ensure vert.x has loaded and deployed one verticle before continuing the init method:
Vertx vertx;
String verticleId;
Thread vertxThread;
AtomicBoolean vertxDeployed = new AtomicBoolean(false);
void init() {
final Runnable vertxRunner = () -> {
this.vertx = Vertx.vertx();
this.vertx.deployVerticle("my.verticle.class.name")
.onSuccess(id -> {
this.verticleId = id;
synchronized (this.vertxDeployed) {
this.vertxDeployed.set(true);
this.vertxDeployed.notify();
}
})
// Failure handling is not production quality
.onFailure(System.err::println);
System.out.println("End vertxRunner");
};
this.vertxThread = new Thread(vertxRunner);
this.vertxThread.start();
while (!this.vertxDeployed.get()) {
synchronized (this.vertxDeployed) {
this.vertxDeployed.wait();
}
}
// Other init actions go here ...
}
There are a few stumbling blocks you need to be aware of:
- The thread will run until you terminate vert.x, even when your Thread variable goes out of scope
- Keep a reachable instance of vert.x, so you can undeploy the verticle(s) which terminates the thread or call vertx.close() to shut it down
- You can't just call
wait()
on an object, since there is the lovely condition called spurious wakeup. The solution is to check the condition in a loop as seen above
Calling the async function
Let's say we have a method answerToAllQuestions()
that returns a io.vertx.core.Future<Integer>
wich we want to call from a doGet
method:
Future<Integer> answerToAllQuestions() {
return Future.completedFuture(42);
}
The java.util.concurrent.CompletableFuture has the method get()
to allow us to wait for completion. So we can write:
@Override
protected void doPost(HttpServletRequest request, HttpServletResponse response) throws Exception {
Integer answer = answerToAllQuestions()
.toCompletionStage()
.toCompletableFuture().get();
try (PrintWriter w = response.getWriter()) {
w.append(answer);
w.flush();
}
}
The get()
method optional allows to specify a timeout, so we are not at the mercy of the asynchonous operation.
I compiled a command line GIST spiked with println()
to demo the steps. Go and have a look.
As usual YMMV
Posted by Stephan H Wissel on 03 August 2022 | Comments (0) | categories: Java vert.x