JetBrains/lincheck

Testing blocking operations

adamw opened this issue · 10 comments

adamw commented

The following is more of a question, than a bug report: if there's a better place to ask questions, please excuse me and point me in the right direction :)

I'm trying to use lincheck to test some blocking code, using Java 21 & Loom under the hood. The first step to make lincheck work at all was to bump the version of ASM to 9.6, but with this out of the way, things seemed promising: the examples from the documentation worked fine.

However, lincheck doesn't support testing (interruptible) blocking code out of the box. However, we've got suspension support - so I'm trying to use that. Here's my attempt at testing Java's SynchronousQueue:

import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.runInterruptible
import org.jetbrains.kotlinx.lincheck.annotations.Operation
import org.jetbrains.kotlinx.lincheck.annotations.Param
import org.jetbrains.kotlinx.lincheck.check
import org.jetbrains.kotlinx.lincheck.paramgen.IntGen
import org.jetbrains.kotlinx.lincheck.strategy.stress.StressOptions
import org.junit.*
import java.util.concurrent.SynchronousQueue

@Param(name = "value", gen = IntGen::class, conf = "1:9")
class BasicQueueTest {
  private val c = SynchronousQueue<Int>()

  @Operation(blocking = true, allowExtraSuspension = true)
  suspend fun send(@Param(name = "value") v: Int) = runInterruptible(Dispatchers.IO) { c.put(v) }

  @Operation(blocking = true, allowExtraSuspension = true)
  suspend fun receive() = runInterruptible(Dispatchers.IO) { c.take() }

  @Test
  fun stressTest () = StressOptions().check(this::class)
}

When running, I get quite surprising results. Either the test hangs, throwing an exception due to usage of Thread.stop (which is fixed in this PR), or produces an output such as:

Test BasicQueueTest.stressTest failed: org.jetbrains.kotlinx.lincheck.LincheckAssertionError:
= Invalid execution results =
| ---------------------------------------------------- |
|         Thread 1         |         Thread 2          |
| ---------------------------------------------------- |
| receive(): SUSPENDED + 6 | send(6): SUSPENDED + void |
| ---------------------------------------------------- |
, took 1.194s
    at org.jetbrains.kotlinx.lincheck.LinChecker.check(LinChecker.kt:38)
    at org.jetbrains.kotlinx.lincheck.LinChecker$Companion.check(LinChecker.kt:144)
    at org.jetbrains.kotlinx.lincheck.LinCheckerKt.check(LinChecker.kt:159)
    at org.jetbrains.kotlinx.lincheck.LinCheckerKt.check(LinChecker.kt:168)

This prompts the questions:

  • what's the + in the results? The execution looks correct to me
  • is my attempt at testing blocking code valid at all?

Thanks!

adamw commented

@ndkoval sorry for the direct mention, but maybe you would have some insights if my attempts to use lincheck are correct at all? :)

eupp commented

Hi @adamw !

  • SUSPENDED + x notation means that the suspending function was suspended during the execution, but then resumed and returned the value x as a result
  • Regarding your test. I believe that you should not call runInterruptible(Dispatchers.IO) by yourself. Internally, the framework takes care of suspending functions and sets up its own proper runner. You can take a look at the RendezvousChannelTest as a reference of lincheck test for suspending operations.
adamw commented

@eupp thanks for the answer :)

As for the first point - do you know then why the test fails? There doesn't seem to be anything wrong with suspending and returning 6 (or void) per se.

As for the second: I don't really have suspendable functions. What I'm trying to do, is using lincheck to test thread-blocking code (using Loom). So I was trying to convert a thread-blocking function into a suspendable one using runInterruptible.

However, I think the problem here might be that with suspendable functions, you know that by the time a continuation is returned, the function is blocked. With the approach from my example, that's not necessarily the case: we always return a continuation, while the computation might still be running. So every invocation will be considered suspended at some point, even if no suspension ever happened. I'm not sure if it's possible to fix this, as in Java we don't have access to the continuations. Maybe I could use Thread.getState, though I'm not sure if this is its intended usage, and quite probably this would mean rolling a custom solution, instead of using lincheck.

eupp commented

As for the first point - do you know then why the test fails? There doesn't seem to be anything wrong with suspending and returning 6 (or void) per se.

By default, Lincheck checks the data structure for linearizability. Receiving a parallel execution results, like the one in your example:

| ---------------------------------------------------- |
|         Thread 1         |         Thread 2          |
| ---------------------------------------------------- |
| receive(): SUSPENDED + 6 | send(6): SUSPENDED + void |
| ---------------------------------------------------- |

it would try to find a linearization that, when executed sequentially, gives the same results.
In your example there are two possible candidate linearizations: receive() ; send(6) and send(6) ; receive().
Since the Invalid execution results error appears, the framework has failed to reproduce the same results for both variants.

I still believe the issue is due to runInterruptible(Dispatchers.IO).
When executing the tests, lincheck creates a its own internal threads for each thread of generated scenario.
It does not know anything about any external threads (like IO threads from Dispatchers.IO) and has no control over them.

It could be the case that when the scenario is executed in parallel and then sequentially (during linearization search), the threads in Dispatchers.IO are scheduled differently thus leading to different execution results --- a situation treated as an error by lincheck.

adamw commented

Thanks - when removing the runInterruptible, the whole test simply hangs (until there's a timeout and lincheck tries to run Thread.stop, which is not supported in JDK 21). Since SynchronousQueue is a rendezvous one, it's hard to linearize the operations, since two threads must meet for an exchange of values to happen. I guess it's the same with Kotlin's channels, but I'd guess that since the Kotlin implementation uses "true" continuations, that's somehow catered for.

eupp commented

@adamw

I was able to successfully run your test by applying few modifications to it:

@Param(name = "value", gen = IntGen::class, conf = "1:9")
class BasicQueueTest {
    private val c = SynchronousQueue<Int>()

    @Operation(blocking = true, allowExtraSuspension = true)
    suspend fun send(@Param(name = "value") v: Int) = runInterruptible { c.put(v) }

    @Operation(blocking = true, allowExtraSuspension = true)
    suspend fun receive() = runInterruptible { c.take() }

    @Test
    fun stressTest () = StressOptions()
        .addCustomScenario {
            parallel {
                thread {
                    actor(::receive)
                }
                thread {
                    actor(::send, 5)
                }
            }
        }
        .iterations(0)
        .invocationsPerIteration(1000)
        .sequentialSpecification(SequentialSynchronousQueue::class.java)
        .check(this::class)
}

@OptIn(InternalCoroutinesApi::class)
class SequentialSynchronousQueue : SequentialIntChannel(capacity = 0)
  • I used just runInterruptible without IO dispatcher, so that no external IO threads are used to run continuations.
  • I provided a separate class that implements "sequential specification" of a rendevouz SynchronousQueue:
.sequentialSpecification(SequentialSynchronousQueue::class.java)

With this option, Lincheck will use the class SequentialSynchronousQueue when searching for linearization, instead of the original class BasicQueueTest (note that two class should have same API for all methods marked as Operation).

You can find the definition of SequentialIntChannel here.
We use it inside our internal lincheck tests for testing channel-like data structures.

Perhaps, we should make SequentialIntChannel a part of public API, so that the users can also use it to test their data structures implementing the channel interface. @ndkoval what do you think?

@adamw for now, the simplest solution for you could be just copy-pasting the implementation of SequentialIntChannel into your tests.

adamw commented

@eupp Thanks a lot for the investigation :)

I copied your code, but when I change iterations to something more than 0, the test hangs (or I get an exception due to usage of Thread.stop, if I use a version w/o PR #214 applied).

I see how using the sequential specification might help, although it also uses suspendable functions, so I thought that's more of an additional validation to check against a straightforward implementation which doesn't have to be thread-safe, rather than a requirement to test suspendable functions in the first place.

eupp commented

So there are several aspects to this issue.

  1. Currently, the Lincheck verifiers do not support blocking operations (unless they are marked as suspend and suspend only due to coroutine's suspension). For example, if some operation is parked (via LockSupport.park), but never unparked, this situation may block the verifier indefinitely. This problem is tracked by #167 and should be resolved in #143.

  2. The Lincheck has not been yet tested to support Virtual Threads. Here is a tracking issue for this: #261.

  3. Lastly, currently the Lincheck is unable to track custom coroutines launched in the user's code (or the coroutines scheduled on non-Lincheck threads). This is why the code like runInterruptible(Dispatchers.IO) { ... } inside operations might not work as expected. This problem is tracked by #260 and is currently postponed for the reasons given in the issue.

@eupp thanks for the answers :) though @ndkoval just closed this as not planned, not sure how this relates to #143 then?

eupp commented

@adamw I think it's more of a "this issue boils down to several existing issues" than "not planned".