CompletableFuture
provides API for asynchronous processing. It is qiite a powerful API and using it, you can chain multiple "stages" to create a pipeline. It relies heavily on callbacks, as the stages are executed asynchronously.
Structured Concurrency, on the other hand, provides an API that makes code imperative.
This repo is part of my talk 'Structured Concurrency in Java: The what and the why'. The examples discussed in the talk are implemented here.
There are 3 examples: Event management, Weather service and Banking portal
The class CFExamples
implements these examples using CompletableFuture
and LoomExamples
class implements then using structured concurrency API.
Execute all sub tasks.
With CompletableFuture
, it looks like this:
public static void createEvent() {
var future1 = CompletableFuture.supplyAsync(EventUtil::reserveVenue);
var future2 = CompletableFuture.supplyAsync(EventUtil::bookHotel);
var future3 = CompletableFuture.supplyAsync(EventUtil::buySupplies);
var futureEvent = CompletableFuture.allOf(future1, future2, future3)
.thenApply(ignored -> {
var venue = future1.join();
var hotel = future2.join();
var supplies = future3.join();
return new EventUtil.Event(venue, hotel, supplies);
});
System.out.println("Event : " + futureEvent.join());
}
Equivalent code with structured concurrency is:
public static void createEvent() {
try(var scope = new StructuredTaskScope.ShutdownOnFailure()) {
var task1 = scope.fork(EventUtil::reserveVenue);
var task2 = scope.fork(EventUtil::bookHotel);
var task3 = scope.fork(EventUtil::buySupplies);
scope.join();
var venue = task1.get();
var hotel = task2.get();
var supplies = task3.get();
System.out.println("Event: " + new EventUtil.Event(venue, hotel, supplies));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
Execute at least one sub task:
With CompletableFuture
, it looks like this:
public static void getWeather() {
var future1 = CompletableFuture.supplyAsync(() -> WeatherUtil.getWeatherFromSource1("Amsterdam"));
var future2 = CompletableFuture.supplyAsync(() -> WeatherUtil.getWeatherFromSource2("Amsterdam"));
var future3 = CompletableFuture.supplyAsync(() -> WeatherUtil.getWeatherFromSource3("Amsterdam"));
CompletableFuture.anyOf(future1, future2, future3)
.exceptionally(th -> {
throw new RuntimeException(th);
})
.thenAccept(weather -> System.out.println("Weather: " + weather))
.join();
}
And with structured concurrency:
public static void getWeather() {
try (var scope = new StructuredTaskScope.ShutdownOnSuccess<WeatherUtil.Weather>()) {
scope.fork(() -> WeatherUtil.getWeatherFromSource1("Amsterdam"));
scope.fork(() -> WeatherUtil.getWeatherFromSource2("Amsterdam"));
scope.fork(() -> WeatherUtil.getWeatherFromSource3("Amsterdam"));
var weather = scope.join().result();
System.out.println(weather);
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}
Combine multiple sub tasks:
With CompletableFuture
, it looks like this:
public static void getOfferForCustomer() {
var future1 = CompletableFuture.supplyAsync(CustomerUtil::getCurrentCustomer);
var future2 = future1.thenApplyAsync(CustomerUtil::getSavingsData);
var future3 = future1.thenApplyAsync(CustomerUtil::getLoansData);
var customer = future1
.exceptionally(th -> { throw new RuntimeException(th); })
.join();
var future = future2
.thenCombine(future3, ((savings, loans) -> new CustomerDetails(customer, savings, loans)))
.thenApplyAsync(CustomerUtil::calculateOffer)
.exceptionally(th -> { throw new RuntimeException(th); });
System.out.println("Offer: " + future.join());
}
And with structured concurrency:
public static void getOfferForCustomer() {
var customer = CustomerUtil.getCurrentCustomer();
try(var scope = new StructuredTaskScope.ShutdownOnFailure()) {
var task1 = scope.fork(() -> CustomerUtil.getSavingsData(customer));
var task2 = scope.fork(() -> CustomerUtil.getLoansData(customer));
scope.join().throwIfFailed();
var savings = task1.get();
var loans = task2.get();
var details = new CustomerUtil.CustomerDetails(customer, savings, loans);
var offer = CustomerUtil.calculateOffer(details);
System.out.println(offer);
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}
There are two main differences:
CompletableFuture
uses asynchronous mechanism and callbacks to implement pipelines. Whereas structured concurrency makes code imperative. Because of this, the code with structured concurrency is often more readable than the one withCompletableFuture
.- Also,
CompletableFuture
relies on lambdas which means we can only throwRuntimeException
-s. Structured concurrency uses checked exceptions.CompletionException
used byCompletableFuture
isRuntimeException
. WhereasExecutionException
used by structured concurrency is checked exception. This means with structured concurrency, you are forced to catch these exceptions ans handle the error scenarios.
Besides these differences, we see some peculiar situations and quirks of CompletableFuture
and structured concurrency API during the talk.
-
JDK early access build. You would need get early access build with support for structured concurrency. You can get one from here: https://jdk.java.net/21/ These are builds based on JDK 21.
-
Make sure you use
--enable-preview
for build and run. If you are using IntelliJ:- Make sure to enable preview for Java Compiler:
- Make sure to enbale preview in 'Run Configurations...'