Async Java with vert.x
I wrote about more modern Java syntax and streams before.
There is more to it. Non Blocking I/O and Event Loops allow for
better performance. It's not a magic bullet, some readjustment is required
Adjusting methods, exceptions and return values
Initially it might look daunting, but the adjustments are not too big. Let's look at some examples. A classic Java method looks like this:
String someResult throws DidnWorkException {
// Working code goes here
if (someCondition) {
throw new DidnWorkException();
}
return "It worked";
}
Its asynchronous counter-part looks like this:
Future<String> someResult() {
return Future.future(promise -> {
// Working code goes here
if (someCondition) {
promise.fail("It didn't work"); // Could use a Throwable too
} else {
promise.complete("It worked");
}
});
}
Update
Paulo Lopes, one of the Redhat vert.x core maintainers, suggested to restructure the method to make it clearer (and eliminate a lambda). Here is his take:
Future<String> someResult() {
// we promise that we will write something in the future
Promise<String> promise = Promise.promise();
// Working code goes here (it can be an asynchronous call too...)
if (someCondition) {
promise.fail("It didn't work"); // Could use a Throwable too
} else {
promise.complete("It worked");
}
// we return the future (read-side) of the promise
return promise.future();
}
His comment: It doesn't make much of a difference in terms of executed code, it's more of a readability pattern. You now read the code as if it was sequentially which makes it easier to reason (for some of us)
The main difference: an async function does not throw errors, but return successed or failures.
A failure will always return a Throwable
,
even if you only provided a String as error explanation. When the body of your method might throw something, you wrap it in a try/catch as you are used to. Calling such a method is slightly different. In classic Java you code:
try {
System.out.println(someResult());
} catch (Exception e) {
logger.error(e);
}
while the async call looks like this:
someResult()
.onFailure(logger::error) //Java8 style shortcut for (e -> logger.error(e))
.onSuccess(System.out::println);
There are more variations available what to do with a returned Future:
otherwise
: return a value or call a function when the future failedrecover
: call a future instead of the failed one and return its valueonComplete
: run code when it is done, regardless of success or failure
There are a few more. Future functions can be chained usingcompose
or run in parallel using CompositeFuture
One common pattern with Futures: Only check if a future succeeded or failed and not requiring a return value to look at.
In this case we use a Future<Void>
Void is a java.lang
class.
More samples
Chaining multiple Futures (looks like JavaScript promises, isn't it?):
myFirstMethod()
.compose(v -> mySecondMethod())
.compose(secondResult -> thirdMethod(someVal, secondResult))
.compose(this::numberFour)
.onSuccess(System.out::println)
.onFailure(logger::error);
The first call myFirstMethod()
returns a Future<Void>
. By convention we use v
to indicate that it either returns Void
or we don't need the actual result, but only the fact of success or failure. The ::
syntax takes a little while to get used to.
When the sequence of a Future execution doesn't matter, e.g. calling a few independent services, but we only continue when they are all done, CompositeFuture is our friend. Lets say we have a series of services we need to call and assemble the result as a JsonObject:
Future<JsonObject> callAllServices(List<Supplier<Future<JsonObject>> services) {
return Future.future(promise -> {
List<Future<JsonObject>> list = services.stream()
.forEach(Supplier::get)
.collect(Collectors.toList());
CompositeFuture.all(list)
.onFailure(promise::fail)
.onSuccess(v -> {
final JsonObject result = new JsonObject();
list.stream().forEach(r -> result.mergeIn(r.result()));
promise.complete(result);
});
});
}
The method will call all Suppliers and return a result when all services return successful. When your business logic allows to live with available service only, the method looks slightly different (look carefully):
Future<JsonObject> callAvailableServices(List<Supplier<Future<JsonObject>> services) {
return Future.future(promise -> {
List<Future<JsonObject>> list = services.stream()
.forEach(Supplier::get)
.collect(Collectors.toList());
CompositeFuture.any(list)
.onFailure(promise::fail)
.onSuccess(v -> {
final JsonObject result = new JsonObject();
list.stream().filter(Future::succeeded)
.forEach(r -> result.mergeIn(r.result()));
promise.complete(result);
});
});
}
The difference is the call to CompositeFuture.any
and filtering out results with .filter(Future::succeeded)
. On lesson learned. Using Streams and Futures and composition: actual work lives best in distinct functions that implement common interfaces
The dense syntax takes a while to get used to and there is more to learn, so YMMV
Posted by Stephan H Wissel on 06 January 2022 | Comments (0) | categories: Domino Singapore