Testing blocking operations
adamw opened this issue · 10 comments
The following is more of a question, than a bug report: if there's a better place to ask questions, please excuse me and point me in the right direction :)
I'm trying to use lincheck to test some blocking code, using Java 21 & Loom under the hood. The first step to make lincheck work at all was to bump the version of ASM to 9.6, but with this out of the way, things seemed promising: the examples from the documentation worked fine.
However, lincheck doesn't support testing (interruptible) blocking code out of the box. However, we've got suspension support - so I'm trying to use that. Here's my attempt at testing Java's SynchronousQueue
:
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.runInterruptible
import org.jetbrains.kotlinx.lincheck.annotations.Operation
import org.jetbrains.kotlinx.lincheck.annotations.Param
import org.jetbrains.kotlinx.lincheck.check
import org.jetbrains.kotlinx.lincheck.paramgen.IntGen
import org.jetbrains.kotlinx.lincheck.strategy.stress.StressOptions
import org.junit.*
import java.util.concurrent.SynchronousQueue
@Param(name = "value", gen = IntGen::class, conf = "1:9")
class BasicQueueTest {
private val c = SynchronousQueue<Int>()
@Operation(blocking = true, allowExtraSuspension = true)
suspend fun send(@Param(name = "value") v: Int) = runInterruptible(Dispatchers.IO) { c.put(v) }
@Operation(blocking = true, allowExtraSuspension = true)
suspend fun receive() = runInterruptible(Dispatchers.IO) { c.take() }
@Test
fun stressTest () = StressOptions().check(this::class)
}
When running, I get quite surprising results. Either the test hangs, throwing an exception due to usage of Thread.stop
(which is fixed in this PR), or produces an output such as:
Test BasicQueueTest.stressTest failed: org.jetbrains.kotlinx.lincheck.LincheckAssertionError:
= Invalid execution results =
| ---------------------------------------------------- |
| Thread 1 | Thread 2 |
| ---------------------------------------------------- |
| receive(): SUSPENDED + 6 | send(6): SUSPENDED + void |
| ---------------------------------------------------- |
, took 1.194s
at org.jetbrains.kotlinx.lincheck.LinChecker.check(LinChecker.kt:38)
at org.jetbrains.kotlinx.lincheck.LinChecker$Companion.check(LinChecker.kt:144)
at org.jetbrains.kotlinx.lincheck.LinCheckerKt.check(LinChecker.kt:159)
at org.jetbrains.kotlinx.lincheck.LinCheckerKt.check(LinChecker.kt:168)
This prompts the questions:
- what's the
+
in the results? The execution looks correct to me - is my attempt at testing blocking code valid at all?
Thanks!
@ndkoval sorry for the direct mention, but maybe you would have some insights if my attempts to use lincheck are correct at all? :)
Hi @adamw !
SUSPENDED + x
notation means that the suspending function was suspended during the execution, but then resumed and returned the valuex
as a result- Regarding your test. I believe that you should not call
runInterruptible(Dispatchers.IO)
by yourself. Internally, the framework takes care of suspending functions and sets up its own proper runner. You can take a look at theRendezvousChannelTest
as a reference of lincheck test for suspending operations.
@eupp thanks for the answer :)
As for the first point - do you know then why the test fails? There doesn't seem to be anything wrong with suspending and returning 6 (or void) per se.
As for the second: I don't really have suspendable functions. What I'm trying to do, is using lincheck to test thread-blocking code (using Loom). So I was trying to convert a thread-blocking function into a suspendable one using runInterruptible
.
However, I think the problem here might be that with suspendable functions, you know that by the time a continuation is returned, the function is blocked. With the approach from my example, that's not necessarily the case: we always return a continuation, while the computation might still be running. So every invocation will be considered suspended at some point, even if no suspension ever happened. I'm not sure if it's possible to fix this, as in Java we don't have access to the continuations. Maybe I could use Thread.getState
, though I'm not sure if this is its intended usage, and quite probably this would mean rolling a custom solution, instead of using lincheck.
As for the first point - do you know then why the test fails? There doesn't seem to be anything wrong with suspending and returning 6 (or void) per se.
By default, Lincheck checks the data structure for linearizability. Receiving a parallel execution results, like the one in your example:
| ---------------------------------------------------- |
| Thread 1 | Thread 2 |
| ---------------------------------------------------- |
| receive(): SUSPENDED + 6 | send(6): SUSPENDED + void |
| ---------------------------------------------------- |
it would try to find a linearization that, when executed sequentially, gives the same results.
In your example there are two possible candidate linearizations: receive() ; send(6)
and send(6) ; receive()
.
Since the Invalid execution results
error appears, the framework has failed to reproduce the same results for both variants.
I still believe the issue is due to runInterruptible(Dispatchers.IO)
.
When executing the tests, lincheck creates a its own internal threads for each thread of generated scenario.
It does not know anything about any external threads (like IO
threads from Dispatchers.IO
) and has no control over them.
It could be the case that when the scenario is executed in parallel and then sequentially (during linearization search), the threads in Dispatchers.IO
are scheduled differently thus leading to different execution results --- a situation treated as an error by lincheck.
Thanks - when removing the runInterruptible
, the whole test simply hangs (until there's a timeout and lincheck tries to run Thread.stop
, which is not supported in JDK 21). Since SynchronousQueue
is a rendezvous one, it's hard to linearize the operations, since two threads must meet for an exchange of values to happen. I guess it's the same with Kotlin's channels, but I'd guess that since the Kotlin implementation uses "true" continuations, that's somehow catered for.
I was able to successfully run your test by applying few modifications to it:
@Param(name = "value", gen = IntGen::class, conf = "1:9")
class BasicQueueTest {
private val c = SynchronousQueue<Int>()
@Operation(blocking = true, allowExtraSuspension = true)
suspend fun send(@Param(name = "value") v: Int) = runInterruptible { c.put(v) }
@Operation(blocking = true, allowExtraSuspension = true)
suspend fun receive() = runInterruptible { c.take() }
@Test
fun stressTest () = StressOptions()
.addCustomScenario {
parallel {
thread {
actor(::receive)
}
thread {
actor(::send, 5)
}
}
}
.iterations(0)
.invocationsPerIteration(1000)
.sequentialSpecification(SequentialSynchronousQueue::class.java)
.check(this::class)
}
@OptIn(InternalCoroutinesApi::class)
class SequentialSynchronousQueue : SequentialIntChannel(capacity = 0)
- I used just
runInterruptible
withoutIO
dispatcher, so that no external IO threads are used to run continuations. - I provided a separate class that implements "sequential specification" of a rendevouz
SynchronousQueue
:
.sequentialSpecification(SequentialSynchronousQueue::class.java)
With this option, Lincheck will use the class SequentialSynchronousQueue
when searching for linearization, instead of the original class BasicQueueTest
(note that two class should have same API for all methods marked as Operation
).
You can find the definition of SequentialIntChannel
here.
We use it inside our internal lincheck tests for testing channel-like data structures.
Perhaps, we should make SequentialIntChannel
a part of public API, so that the users can also use it to test their data structures implementing the channel interface. @ndkoval what do you think?
@adamw for now, the simplest solution for you could be just copy-pasting the implementation of SequentialIntChannel
into your tests.
@eupp Thanks a lot for the investigation :)
I copied your code, but when I change iterations
to something more than 0
, the test hangs (or I get an exception due to usage of Thread.stop
, if I use a version w/o PR #214 applied).
I see how using the sequential specification might help, although it also uses suspendable functions, so I thought that's more of an additional validation to check against a straightforward implementation which doesn't have to be thread-safe, rather than a requirement to test suspendable functions in the first place.
So there are several aspects to this issue.
-
Currently, the Lincheck verifiers do not support blocking operations (unless they are marked as
suspend
and suspend only due to coroutine's suspension). For example, if some operation is parked (viaLockSupport.park
), but never unparked, this situation may block the verifier indefinitely. This problem is tracked by #167 and should be resolved in #143. -
The Lincheck has not been yet tested to support Virtual Threads. Here is a tracking issue for this: #261.
-
Lastly, currently the Lincheck is unable to track custom coroutines launched in the user's code (or the coroutines scheduled on non-Lincheck threads). This is why the code like
runInterruptible(Dispatchers.IO) { ... }
inside operations might not work as expected. This problem is tracked by #260 and is currently postponed for the reasons given in the issue.