mirromutth/r2dbc-mysql

Driver drops onNext after publishing results on another thread (e.g. boundedElastic)

AlexeiKhilchuk opened this issue · 2 comments

Hello. I'm facing the issue when DatabaseClient do not respond on requests on load that causes stream to hang. It reproduces with the test that is provided below. Higher chance to reproduce occurs when I'm trying to publish results of database client on boundedElastic().This stream never completes. Am I doing something wrong here or it is a bug? :)

Notes:

  • That it also reproduces without publishing on another scheduler.
  • I tried to use jasync-sql driver for this test and issue is not reproducible.

I'm ready to provide additional info for you if needed ASAP.
Thanks in advance!

gradle dependencies:

implementation("org.springframework.boot", "spring-boot-starter-data-r2dbc")
implementation("dev.miku", "r2dbc-mysql", "0.8.2.RELEASE")
implementation("io.r2dbc:r2dbc-pool:0.8.6.RELEASE")

MySQL version: MySQL Server 8.0.23 (Docker image)

R2DBC Config:

spring:
  r2dbc:
    url: r2dbcs:mysql://localhost:3308:test_db?serverTimezone=Europe/Moscow&useUnicode=yes&characterEncoding=UTF-8
    username: test_user
    password: password
    pool:
      enabled: true
      max-size: 128
      initial-size: 128
      max-idle-time: PT1M
      validation-query: "SELECT 1"

Test code:

@ExtendWith(SpringExtension::class)
@SpringBootTest
class DebugTest @Autowired constructor(
    private val databaseClient: DatabaseClient,
)  {

    @Test
    fun `db test`() {
        val publishScheduler = Schedulers.newParallel("my-parallel", 512)
        
        //Clear table
        databaseClient
            .delete()
            .from(ColorEntity::class.java)
            .then()
            .block()

        //Insert entities into the table
        Flux.range(0, 50000)
            .publishOn(publishScheduler)
            .map { ColorEntity(UUID.fromString("1-1-1-1-$it"), "q", "s", "s", "s") }
            .flatMap { entity ->
                databaseClient
                    .insert()
                    .into(ColorEntity::class.java)
                    .using(entity)
                    .then()
                    .thenReturn(entity)
                    .publishOn(Schedulers.boundedElastic()) 
            }
            .collectList()
            .block()
    }
}

@Table("color")
data class ColorEntity(
    @Id
    val id: UUID,
    val name: String,
    val hexCode: String,
    val manufactureCode: String,
    val imageUrl: String
)

Looking at the test it's not clear whether you're using a connection pool or not. Fan out to 512 concurrent threads can easily overload the target server or hit other system limitations. Care to attach a debug log as file?

Hey, thanks for your response. Attaching log file under debug level. As I see in DefaultDatabaseClient bean implementation it uses r2dbc pool from connection factory that is enabled in spring r2dbc config.
log.txt