This practice session is about yet more ways threads can be synchronized. Read the introduction for each section and try to solve the tasks for it. Try to read the javadoc of the relevant classes before googling stackoverflow.
The basic way to run some code in parallel is to use threads.
A thread is given a Runnable
and the thread will start the run
method.
new Thread(new Runnable() {
@Override
public void run() {
try {
// calculate some result
} catch (Exception e) {
// maybe something will catch it :/
throw new RuntimeException(e);
}
}
}).start();
When the result is needed in some other thread, then some trickery is needed to pass it around.
One way is to use a BlockingQueue
, another is to use shared state and synchronized
blocks.
It's even more tricky to correctly pass an exception to the other threads when something crashes.
CompletableFuture
is a class that makes it easy to pass around the calculated result between threads while also correctly handling exceptions.
Here is how it looks in action:
static Map<Path, Long> findFileSizes(Path root) throws IOException {
Map<Path, Long> files = new HashMap<>();
Files.walkFileTree(root, new SimpleFileVisitor<>() {
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes fileInfo) {
files.put(file, fileInfo.size());
return FileVisitResult.CONTINUE;
}
});
return files;
}
static CompletableFuture<Map<Path, Long>> findFileSizesAsync(Path root) {
CompletableFuture<Map<Path, Long>> cf = new CompletableFuture<>();
new Thread(() -> {
try {
cf.complete(findFileSizes(root));
} catch (Exception e) {
cf.completeExceptionally(e);
}
}).start();
return cf;
}
public static void main(String[] args) throws Exception {
CompletableFuture<Map<Path, Long>> srcFuture = findFileSizesAsync(Path.of("src"));
CompletableFuture<Map<Path, Long>> nonexistingFuture = findFileSizesAsync(Path.of("nonexisting"));
// both src,nonexisting run in parallel
Set<Path> sourceFiles = srcFuture.get().keySet(); // get waits until result is ready
System.out.println(sourceFiles);
Set<Path> nothing = nonexistingFuture.get().keySet(); // throws exception
}
What is going on here?
main
callsfindFileSizesAsync
.findFileSizesAsync
creates a newCompletableFuture
object that works as a placeholder for the result. Initially, it is empty, because the result is not computed yet.- A new background thread is started to find the files and their sizes.
findFileSizesAsync
returns the CompletableFuture immediately, without waiting for the thread to complete its work.main
calls anotherfindFileSizesAsync
to find the file sizes of another directory, which starts another background thread.main
will callget
on both CompletableFutures. Callingget
will block the main thread (cause the thread to sleep) until the background thread has finished its work and stored the result in the CompletableFuture.
Note that if finding the file sizes fails, then the exception is stored inside the CompletableFuture instead of the result.
When get
is called, it will throw the same exception.
This makes it easy to pass any exceptions from the background thread to the thread that uses the result.
The real strength of CompletableFuture is composability. CompletableFuture makes it easy to schedule more methods to run once the CompletableFuture is completed with a result.
The findFileSizesAsync
method finds the file sizes.
It may be useful to also find the total size of all the files, but that can only be done after the files are collected.
One way would be to modify the original findFileSizesAsync
method and add the calculation there.
Unfortunately that would make it more difficult to return the result to some other thread, because now there's two pieces of data: the files and the total.
Also, in case of exceptions, it's hard to figure out which part of the calculation failed.
CompletableFuture provides a more flexible alternative:
public static void main(String[] args) throws Exception {
CompletableFuture<Map<Path, Long>> filesFuture = findFileSizesAsync(Path.of("src"));
CompletableFuture<Long> totalSizeFuture = alsoGetTheTotalSize(filesFuture);
filesFuture.thenAccept(files -> {
for (Map.Entry<Path, Long> e : files.entrySet()) {
System.out.println("file=" + e.getKey() + " size=" + e.getValue());
}
});
// do other stuff while the background thread does its thing
System.out.println("total size " + totalSizeFuture.get());
}
static CompletableFuture<Long> alsoGetTheTotalSize(CompletableFuture<Map<Path, Long>> filesFuture) {
CompletableFuture<Long> totalFuture = filesFuture.thenApply(files -> {
long total = 0;
for (long size : files.values())
total += size;
return total;
});
return totalFuture;
}
Here main
calls alsoGetTheTotalSize
and passes the CompletableFuture of the files.
Next, thenApply
is used to tell the background thread that after it has stored the result in filesFuture
, it should also calculate the total size of the files and store that in another CompletableFuture.
The new CompletableFuture for the total is immediately returned and the actual calculation will happen in the background thread once all the files have been found.
main
also gives the background thread a new task using thenAccept
: when the background thread is done with the files, it should print them all out.
Both these extra tasks are done when the background thread calls complete
on the first CompletableFuture.
- Write a method for downloading articles from wikipedia:
CompletableFuture<String> download(String url) { .. }
- Write a method for counting the dots (
.
) in a string:long countDots(String str) { .. }
- Start downloading the articles in the background in parallel.
Make sure the downloads run in parallel (start all background threads before calling
get
on anything). Finally callget
on each CompletableFuture, count the dots in the downloaded articles and print out the counts.
- Make a copy of WikiAnalyzer1
- Change the copy so that the dots are also counted in the background thread.
- use
thenApply
to move the counting to the background thread - call
get
and print the counts in the main thread (get
should return just the number of dots) - try to download an invalid article (use some garbage for the url) and check that the exception is thrown when calling
get
- use
Suppose you want to write your own CompletableFuture class.
How to make the threads block when get
is called, but the value is not available yet?
How to unblock the threads when the value becomes available?
java.util.concurrent.CountDownLatch
is a tool for this exact purpose.
When creating the latch, a count (non-negative integer) must be specified.
The latch has a method named await
- when it is called and the count is larger than zero, then the calling thread will be blocked.
The latch also has the countDown
method - this will reduce the count by one.
Once the count reaches zero, all threads that are blocked at await
are unblocked.
Here's an example that compiles some java source code and starts it when everything is ready:
List<String> sources = Arrays.asList("Main.java", "Service.java", "Item.java");
CountDownLatch latch = new CountDownLatch(sources.size());
for (String source : sources) {
new Thread(() -> {
compile(source);
latch.countDown();
}).start();
}
latch.await(); // blocks until everything's compiled
startApplication();
Implement your own CompletableFuture. It should have the following functionality:
- a
get
method that returns the value or blocks the calling thread until it's available. - a
complete
method to set the value and unblock the waiting threads. - a
completeExceptionally
method to set an exception and unblock the waiting threads.
Suppose you want to write your own CountDownLatch class.
It should have the methods countDown
and await
.
Most importantly, await
should efficiently block until the count reaches zero.
This task can be solved using the most basic synchronization tools that Java provides: Object#wait
and Object#notifyAll
.
The idea is not complicated: the threads calling wait
on some object will block until another thread calls notifyAll
on the same object.
Both wait
and notifyAll
must be called on the same object, but it doesn't really matter which object - it's only purpose is bring together the waiters and notifiers.
Here's an example of wait/notifyAll:
class SimpleBlockingQueue<T> {
private final List<T> items = new ArrayList<>();
public void put(T item) {
synchronized (items) {
items.add(item);
items.notifyAll(); // wake up waiting threads
}
}
public T take() throws InterruptedException {
synchronized (items) {
while (items.isEmpty())
items.wait(); // wait for notifyAll
return items.remove(0);
}
}
}
While the idea of wait/notifyAll sounds simple, there are a few gotchas to this mechanism.
The main idea of synchronized blocks is that only a single thread can lock an object at a time.
Let's take another look at the SimpleBlockingQueue
example above.
If one thread is inside the take
method waiting for a new item to be added to the queue, then how can another thread put
anything to the same queue?
The thread calling put
would wait for the thread inside take
to finish, which in turn is waiting for something to finish put
.
It turns out that calling items.wait()
will unlock the items
object until the thread wakes up again.
When the thread does wake up, it must first wait until it can lock items
again before it can continue.
Note that to call wait/notifyAll on some object, the thread must be inside a synchronized block that has locked that same object.
Implement your own CountDownLatch.
The latch must have the await
and countDown
methods.
await
must block efficiently using wait/notifyAll.
The Object#wait
, Thread#sleep
and methods built on them can throw the InterruptedException.
What does it mean and when is it thrown?
It's not possible to just stop/kill threads, because that would prevent unlocking all the locks the thread is holding and running the finally blocks. As an alternative, a thread can be interrupted. Interrupting a thread is a way to tell it to please shut down as soon as possible.
InterruptedException is thrown if and only if Thread#interrupt
is called on a thread.
If the thread is currently waiting in an interruptible method (wait, sleep etc.) then the method immediately stops waiting and throws an InterruptedException.
Otherwise, the thread remembers the interrupt and the InterruptedException is thrown the next time the thread reaches an interruptible method.
To make your own code interruptible, simply check Thread#interrupted
in strategic places and throw the InterruptedException when needed.
Interrupts can only successfully stop the thread when the program's code doesn't ignore the InterruptedException. When you need to catch an InterruptedException but can't stop the thread from continuing, then it's nice to restore the interrupt.
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // restore
throw new RuntimeException(e);
}
As always: if possible, then don't catch the exception and let it crash the thread.