Driver drops onNext after publishing results on another thread (e.g. boundedElastic)
AlexeiKhilchuk opened this issue · 2 comments
Hello. I'm facing the issue when DatabaseClient do not respond on requests on load that causes stream to hang. It reproduces with the test that is provided below. Higher chance to reproduce occurs when I'm trying to publish results of database client on boundedElastic().This stream never completes. Am I doing something wrong here or it is a bug? :)
Notes:
- That it also reproduces without publishing on another scheduler.
- I tried to use jasync-sql driver for this test and issue is not reproducible.
I'm ready to provide additional info for you if needed ASAP.
Thanks in advance!
gradle dependencies:
implementation("org.springframework.boot", "spring-boot-starter-data-r2dbc")
implementation("dev.miku", "r2dbc-mysql", "0.8.2.RELEASE")
implementation("io.r2dbc:r2dbc-pool:0.8.6.RELEASE")
MySQL version: MySQL Server 8.0.23 (Docker image)
R2DBC Config:
spring:
r2dbc:
url: r2dbcs:mysql://localhost:3308:test_db?serverTimezone=Europe/Moscow&useUnicode=yes&characterEncoding=UTF-8
username: test_user
password: password
pool:
enabled: true
max-size: 128
initial-size: 128
max-idle-time: PT1M
validation-query: "SELECT 1"
Test code:
@ExtendWith(SpringExtension::class)
@SpringBootTest
class DebugTest @Autowired constructor(
private val databaseClient: DatabaseClient,
) {
@Test
fun `db test`() {
val publishScheduler = Schedulers.newParallel("my-parallel", 512)
//Clear table
databaseClient
.delete()
.from(ColorEntity::class.java)
.then()
.block()
//Insert entities into the table
Flux.range(0, 50000)
.publishOn(publishScheduler)
.map { ColorEntity(UUID.fromString("1-1-1-1-$it"), "q", "s", "s", "s") }
.flatMap { entity ->
databaseClient
.insert()
.into(ColorEntity::class.java)
.using(entity)
.then()
.thenReturn(entity)
.publishOn(Schedulers.boundedElastic())
}
.collectList()
.block()
}
}
@Table("color")
data class ColorEntity(
@Id
val id: UUID,
val name: String,
val hexCode: String,
val manufactureCode: String,
val imageUrl: String
)
Looking at the test it's not clear whether you're using a connection pool or not. Fan out to 512 concurrent threads can easily overload the target server or hit other system limitations. Care to attach a debug log as file?
Hey, thanks for your response. Attaching log file under debug level. As I see in DefaultDatabaseClient bean implementation it uses r2dbc pool from connection factory that is enabled in spring r2dbc config.
log.txt