/kotlin-coroutines-demo

Demo project to simulate different kinds of java API calls, and how to handle them nicely with coroutines.

Primary LanguageKotlinApache License 2.0Apache-2.0

Kotlin Coroutines Demo

Contents

This is a cli project to get some insight on coroutines. It simulates using a Java API which might have blocking methods, and what happens when calling those methods from Kotlin. When run using coroutines it might use extension functions defined in Util.kt to prevent blocking the underlying threads. All the action types will provide integers in order, and depending on the run type they may or may not be printed in order. By setting some parameters such as the delay to get an integer multiple ways of running the same action can be compared.

Since the main focus of this project is to keep things relatively simple, it's lacking some things you would want for running something in production:

  • Using a logging framework, just using println is sufficient for this project.
  • Proper error handling, for example when using a callback, it's often possible an error shows up. At which point a decision need to be made to either resume with the exception, or do something else like providing a default.
  • Being a good example of how coroutines make things simpler. Coroutines help reduce complexity when you have multiple async calls that need to be handled in succession. This project just deals with simply getting an Integer and logging it and thus doesn't touch that aspect of coroutines. Another nice property of coroutines is that they are easily canceled, which is also not handled in this project. If you are interested in this, please take a look at this code belonging to this tutorial
  • Handling streaming/reactive API's. Although certainly interesting case to have a Java Publisher, and how to handle that, probably converting it into a flow, it's currently out of scope. Feel free to create a merge request or issue to include this, if it has your interest.

You can run grade with run --args='-h' to get the info needed to use the cli thanks to kotlinx-cli. Depending on the parameters it wil run actions in a certain way, and log the thread from which it's logged together with the time, so we get some insight into what is happening. It will also give an approximation of the time needed to run everything.

For example when using gradle run --args='-a future -t 3 -r suspended -d 500' it will create a future 3 times and wait till all three of them are realized. Since this is done from a coroutine context, and it doesn't wait for each future to realize before starting the next one, it will be done running them in only a little more than 500 milliseconds. With coroutines the actions will be run using launch, so it won't wait for a task to complete before starting the next task.

Here is the output from run --args='-h' to get a quick idea:

Usage: kotlin coroutines demo options_list
Options: 
    --delay, -d [1000] -> delay for each action in milliseconds { Int }
    --times, -t [10] -> times to run an action { Int }
    --actionType, -a [DELAYED] -> action to execute { Value should be one of [delayed, future, callable, unsafe_consumer, safe_consumer] }
    --runType, -r [SAME_THREAD] -> how to run the actions { Value should be one of [same_thread, same_thread_complicated, thread_pool, suspended, suspended_default, suspended_unconfined, suspended_blocking, suspended_default_blocking, suspended_new] }
    --consumeDelay, -cd [100] -> max delay between poll calls in milliseconds { Int }
    --consumeAmount, -ca [10] -> amount of items to consume before closing the consumer { Int }
    --futureDelay, -fd [10] -> amount of time between checking if the future has resolved { Int }
    --helperThreads, -ht [2] -> amount of threads the helper has, this will be used to suspend the delayed function in another context { Int }
    --help, -h -> Usage info 

I'll go through all the action types, with some examples how they will be run with certain run types.

Delayed is just a single call, that depending on the set delay might take a while to return. When run normally on the same thread it thus needs to wait for each call to complete. So gradle run --args='-a delayed -t 20 -r same_thread -d 500' takes a little over 10 seconds, 20 times half a second. When run on a thread pool it's finished in just over half a second, gradle run --args='-a delayed -t 20 -r same_thread -d 500'. The main problem with Thread Pools is that they need many resources and have limited scalability, for me when I run gradle run --args='-a delayed -t 5000 -r thread_pool -d 500' I get an error: [1.314s][warning][os,thread] Failed to start thread - pthread_create failed (EAGAIN) for attributes: stacksize: 1024k, guardsize: 4k, detached. because it can't create 5000 threads.

When run suspended it will use a separate coroutine context, that by default has two backing threads. Since it's a blocking call, the context will still be saturated fast. Since the 'work' is seperated by two threads, gradle run --args='-a delayed -t 20 -r suspended -d 500' will run twice as fast as same_thread, so about 5 seconds. Since the main thread is just logging the result, if we run it with 20 helper threads, gradle run --args='-a delayed -t 20 -r suspended -d 500 -ht 20' it will finnish a little over half a second. When run with suspended_default it will use the default context for the logging, which has as many treads as vCPU's available. What is interesting is that with gradle run --args='-a delayed -t 20 -r suspended_default -d 500' only two of the backing threads are used for the logging. It will still take about 5 seconds, since it's limited by the helper context in the same way as suspended.

This will return a future, which we can store and call the get() on later. This is tricky to get right, and is done only when running as same_thread_complicated. Doing this we can complete gradle run --args='-a future -t 20 -r same_thread_complicated -d 500' in a little over half a seconds. When run in a blocking way with other runtimes we just directly call the get() effectively making it the same as delayed. So gradle run --args='-a future -t 20 -r same_thread -d 500' will still take little over 10 seconds.

Luckily with coroutines we can check if the future is either canceled or completed, and otherwise delay for some amount of time. Delay might look a lot like Thread.sleep(), but the big advantage is that delay won't block the thread. Using this trick it's possible for gradle run --args='-a future -t 20 -r suspended -d 500' to finish in about half a second. Optionally with -fd the time for the delay can be set, for example gradle run --args='-a future -t 20 -r suspended_unconfined -d 500 -fd 1000' will complete a little over a second, because it will delay a full second, before checking the future again.

This provides an api, where we can pass a function which will be called once the future is completed. This won't help much when run in the usual way, since we still need to get() the resulting future, otherwise the program will end before the Integers are logged. When run with same_thread_complicated we apply a similar trick as with the futures, first creating the futures and then calling get() on them. gradle run --args='-a callable -t 20 -r same_thread_complicated -d 500' thus also completes in half a second, this time it's logged from the Timer thread used in the Java part however, as can be seen from logs like: 2021-11-08T20:34:08.647508Z - received number: 00019 - logged from Timer-0.

With coroutines, we can use the callback to create a suspendCoroutine that resumes when the callback is called. So we no longer need to check each time and delay, but we just 'wait' for the result till we can continue. This code is much more readable compared to the construct needed for same_thread_complicated. Using the suspended coroutines gradle run --args='-a callable -t 20 -r suspended -d 500' completes in about half a second.

The consumer is a specific class, based on the Kafka Consumer , but much more simplified to just care about thread safety and blocking or not when calling poll to get new items. It will create the amount of consumer noted by the -t option, and it will keep polling each consumer till it consumed the items noted by the -ca option. gradle run --args='-a unsafe_consumer -t 5 -r same_thread -d 500 -ca 5' will create 5 consumers serially and each time wait till it consumed 5 items. For each consumer this will take about 2,5 seconds, since the delay is set to 500, meaning it will take hal a second till each next item is available. The time for it to run will thus take about 12,5 seconds. This is not very efficient, and by adding some complexity to the code we can reduce it to 2,5 seconds with the same_thread_complicated, by creating all the consumers first, and than keep polling each till it's done. When a poll is called from a different thread it was previously run, it will through an error. Thus running gradle run --args='-a unsafe_consumer -t 5 -r suspended_default -d 500 -ca 5', will most likely end in an error like: Encountered exception trying to run the tasks: tech.gklijs.consumer.ConcurrentException: called consumer from DefaultDispatcher-worker-2 but was called from DefaultDispatcher-worker-6 before

This is more or less the same thing as the unsafe consumer, but this time the implementation is thread safe using a queue. The command gradle run --args='-a safe_consumer -t 5 -r suspended_default -d 500 -ca 5' will end successfully in about 2,5 seconds. To use the consumer on coroutines a combination of polling with poll(0) and using delay is used as to not block the coroutines.

Here are some diagrams for some action type with run type combinations to make clear what happens.

This is one of the ways to run multiple things in parallel. A poll with multiple threads is created and functions can be executed on of the backing threads. The picture below is an approximation of running gradle run --args='-a delayed -t 2 -r thread_pool'. main thread using two child threads to each do a delayed call

With coroutines, it's possible to switch the coroutine context, thus preventing saturating or blocking one. When the action delayed is run in a coroutine a separate context, supplierContext is used of which the backing threads can be configured with the -ht option, by default it's two. The picture below is an approximation of running gradle run --args='-a delayed -t 2 -r suspended -ht 2' suspend using the newly created context, with the underlying threads

Using the extension function eventuallyLog() in Util.kt the main thread just needs to check occasionally if the future is completed already, so isn't blocked like it would with calling get(). The picture below is an approximation of running gradle run --args='-a future -t 2 -r suspended'. main thread created futures from coroutine and keeps checking if they are completed

Some actions return a consumer. This is a class that was inspired by the Kafka Consumer, but has been drastically simplified to use as example. A consumer is a class which on the background get 'things', in out case Integers, and from another thread we can cal poll to get those things. In thus case we will use the action suspended that will only be using the main thread, so it's fine to use the unsafe consumer. In this case poll(0) is used to poll, so we don't block. The picture below is an approximation of running gradle run --args='-a unsafe_consumer -t 2 -r suspended -ca 2. main thread created consumers, keeps polling and delaying till all consumers delivered 2 things

When we run something like gradle run --args='-a unsafe_consumer -t 2 -r suspended_default -d 50 -ca 100 -cd 50' soon enough an error will pop up because poll is called from multiple threads. This happens because each consumer is actually tracking from which thread it is called, but the actual implementation is also not thread safe, as almost simultaneous will cause it to return more items than it should. The default context has a minimum of 2 threads to work on, with the upper limit being the number of vCPU's available. The actual threads the execution lands on might differ. Once an error pops up, the program is exited. main thread launches to default coroutine context, error will pop up