spring-cloud/spring-cloud-sleuth

Exception configuration on rscoket spring koltin project. AssertingSpan

EgorOparin opened this issue · 5 comments

Describe the bug
Cannot cast 'org.springframework.cloud.sleuth.docs.AssertingSpanBuilder$1' to 'org.springframework.cloud.sleuth.docs.ImmutableAssertingSpan'

spring-boot- 2.7.5
spring-cloud - extra["springCloudVersion"] = "2021.0.2"
sleuth - implementation("org.springframework.cloud:spring-cloud-starter-sleuth:3.1.8")

kotlin - 1.7.22
reactive stack
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor:1.6.4")
implementation("io.projectreactor.kotlin:reactor-kotlin-extensions:1.2.2")
implementation("org.springframework.boot:spring-boot-starter-webflux")
implementation("org.springframework.boot:spring-boot-starter-rsocket")
implementation("org.springframework.boot:spring-boot-starter-data-r2dbc")

Sample
rSocket controller

@MessageMapping("/deposits/update")
    suspend fun updateStatusDeposit(request: DepositsUpdateRequest){
        return withContext(tracer.asContextElement()) {
            depositDataService.updateDepositStatus(request)
        }
    }

TEST

@Test
    fun `update deposits`() {
        val userId = 1111L
        val id1 = UUID.randomUUID()
        val id2 = UUID.randomUUID()

        runBlocking {
            depositRepository.save(getDepositData(id = id1, product = "saving", userId = userId))
            depositRepository.save(getDepositData(id = id2, product = "saving", userId = userId))

            val result: Unit = requester
                .route("/deposits/update")
                .data(DepositsUpdateRequest(listOf(id1, id2), DepositStatus.MONEY_SEND))
                .retrieveAndAwait()
        }
    }

Bug be reproduced only if controller has payload.

full stack trace:

Caused by: java.lang.ClassCastException: class org.springframework.cloud.sleuth.docs.AssertingSpanBuilder$1 cannot be cast to class org.springframework.cloud.sleuth.docs.ImmutableAssertingSpan (org.springframework.cloud.sleuth.docs.AssertingSpanBuilder$1 and org.springframework.cloud.sleuth.docs.ImmutableAssertingSpan are in unnamed module of loader 'app')
	at org.springframework.cloud.sleuth.docs.AssertingSpan.continueSpan(AssertingSpan.java:166) ~[spring-cloud-sleuth-api-3.1.2.jar:3.1.2]
	at org.springframework.cloud.sleuth.instrument.tx.TraceReactiveTransactionManager.lambda$null$0(TraceReactiveTransactionManager.java:76) ~[spring-cloud-sleuth-instrumentation-3.1.2.jar:3.1.2]
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:113) ~[reactor-core-3.4.24.jar:3.4.24]
	at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816) ~[reactor-core-3.4.24.jar:3.4.24]
	at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:249) ~[reactor-core-3.4.24.jar:3.4.24]
	at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816) ~[reactor-core-3.4.24.jar:3.4.24]
	at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:249) ~[reactor-core-3.4.24.jar:3.4.24]
	at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816) ~[reactor-core-3.4.24.jar:3.4.24]
	at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:249) ~[reactor-core-3.4.24.jar:3.4.24]
	at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79) ~[reactor-core-3.4.24.jar:3.4.24]
	at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.complete(MonoIgnoreThen.java:292) ~[reactor-core-3.4.24.jar:3.4.24]
	at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onNext(MonoIgnoreThen.java:187) ~[reactor-core-3.4.24.jar:3.4.24]
	at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:236) ~[reactor-core-3.4.24.jar:3.4.24]
	at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:203) ~[reactor-core-3.4.24.jar:3.4.24]
	at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onComplete(MonoPeekTerminal.java:299) ~[reactor-core-3.4.24.jar:3.4.24]
	at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onComplete(MonoIgnoreElements.java:89) ~[reactor-core-3.4.24.jar:3.4.24]
	at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onComplete(MonoIgnoreElements.java:89) ~[reactor-core-3.4.24.jar:3.4.24]
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2058) ~[reactor-core-3.4.24.jar:3.4.24]
	at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1817) ~[reactor-core-3.4.24.jar:3.4.24]
	at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:249) ~[reactor-core-3.4.24.jar:3.4.24]
	at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79) ~[reactor-core-3.4.24.jar:3.4.24]
	at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.complete(MonoIgnoreThen.java:292) ~[reactor-core-3.4.24.jar:3.4.24]
	at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onNext(MonoIgnoreThen.java:187) ~[reactor-core-3.4.24.jar:3.4.24]
	at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:236) ~[reactor-core-3.4.24.jar:3.4.24]
	at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:203) ~[reactor-core-3.4.24.jar:3.4.24]
	at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onComplete(MonoPeekTerminal.java:299) ~[reactor-core-3.4.24.jar:3.4.24]
	at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:209) ~[reactor-core-3.4.24.jar:3.4.24]
	at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:238) ~[reactor-core-3.4.24.jar:3.4.24]
	at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:203) ~[reactor-core-3.4.24.jar:3.4.24]
	at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onComplete(MonoPeekTerminal.java:299) ~[reactor-core-3.4.24.jar:3.4.24]
	at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onComplete(FluxPeekFuseable.java:940) ~[reactor-core-3.4.24.jar:3.4.24]
	at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onComplete(MonoIgnoreElements.java:89) ~[reactor-core-3.4.24.jar:3.4.24]
	at io.r2dbc.postgresql.util.FluxDiscardOnCancel$FluxDiscardOnCancelSubscriber.onComplete(FluxDiscardOnCancel.java:99) ~[r2dbc-postgresql-0.8.13.RELEASE.jar:0.8.13.RELEASE]
	at reactor.core.publisher.Operators.complete(Operators.java:137) ~[reactor-core-3.4.24.jar:3.4.24]
	at reactor.core.publisher.FluxEmpty.subscribe(FluxEmpty.java:42) ~[reactor-core-3.4.24.jar:3.4.24]
	at reactor.core.publisher.FluxDefer.subscribe(FluxDefer.java:54) ~[reactor-core-3.4.24.jar:3.4.24]
	at io.r2dbc.postgresql.util.FluxDiscardOnCancel.subscribe(FluxDiscardOnCancel.java:49) ~[r2dbc-postgresql-0.8.13.RELEASE.jar:0.8.13.RELEASE]
	at reactor.core.publisher.Mono.subscribe(Mono.java:4455) ~[reactor-core-3.4.24.jar:3.4.24]
	at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:263) ~[reactor-core-3.4.24.jar:3.4.24]
	at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:203) ~[reactor-core-3.4.24.jar:3.4.24]
	at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onComplete(MonoIgnoreElements.java:89) ~[reactor-core-3.4.24.jar:3.4.24]
	at io.r2dbc.postgresql.util.FluxDiscardOnCancel$FluxDiscardOnCancelSubscriber.onComplete(FluxDiscardOnCancel.java:99) ~[r2dbc-postgresql-0.8.13.RELEASE.jar:0.8.13.RELEASE]
	at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onComplete(MonoIgnoreElements.java:89) ~[reactor-core-3.4.24.jar:3.4.24]
	at io.r2dbc.postgresql.util.FluxDiscardOnCancel$FluxDiscardOnCancelSubscriber.onComplete(FluxDiscardOnCancel.java:99) ~[r2dbc-postgresql-0.8.13.RELEASE.jar:0.8.13.RELEASE]
	at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260) ~[reactor-core-3.4.24.jar:3.4.24]
	at reactor.core.publisher.FluxHandle$HandleSubscriber.onComplete(FluxHandle.java:220) ~[reactor-core-3.4.24.jar:3.4.24]
	at io.r2dbc.postgresql.util.FluxDiscardOnCancel$FluxDiscardOnCancelSubscriber.onComplete(FluxDiscardOnCancel.java:99) ~[r2dbc-postgresql-0.8.13.RELEASE.jar:0.8.13.RELEASE]
	at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onComplete(FluxContextWrite.java:126) ~[reactor-core-3.4.24.jar:3.4.24]
	at reactor.core.publisher.FluxCreate$BaseSink.complete(FluxCreate.java:460) ~[reactor-core-3.4.24.jar:3.4.24]
	at reactor.core.publisher.FluxCreate$BufferAsyncSink.drain(FluxCreate.java:805) ~[reactor-core-3.4.24.jar:3.4.24]
	at reactor.core.publisher.FluxCreate$BufferAsyncSink.complete(FluxCreate.java:753) ~[reactor-core-3.4.24.jar:3.4.24]
	at reactor.core.publisher.FluxCreate$SerializedFluxSink.drainLoop(FluxCreate.java:247) ~[reactor-core-3.4.24.jar:3.4.24]
	at reactor.core.publisher.FluxCreate$SerializedFluxSink.drain(FluxCreate.java:213) ~[reactor-core-3.4.24.jar:3.4.24]
	at reactor.core.publisher.FluxCreate$SerializedFluxSink.complete(FluxCreate.java:204) ~[reactor-core-3.4.24.jar:3.4.24]
	at io.r2dbc.postgresql.client.ReactorNettyClient$Conversation.complete(ReactorNettyClient.java:638) ~[r2dbc-postgresql-0.8.13.RELEASE.jar:0.8.13.RELEASE]
	at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.emit(ReactorNettyClient.java:904) ~[r2dbc-postgresql-0.8.13.RELEASE.jar:0.8.13.RELEASE]
	at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.onNext(ReactorNettyClient.java:780) ~[r2dbc-postgresql-0.8.13.RELEASE.jar:0.8.13.RELEASE]
	at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.onNext(ReactorNettyClient.java:686) ~[r2dbc-postgresql-0.8.13.RELEASE.jar:0.8.13.RELEASE]
	at reactor.core.publisher.FluxHandle$HandleSubscriber.onNext(FluxHandle.java:126) ~[reactor-core-3.4.24.jar:3.4.24]
	at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onNext(FluxPeekFuseable.java:854) ~[reactor-core-3.4.24.jar:3.4.24]
	at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:224) ~[reactor-core-3.4.24.jar:3.4.24]
	at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:224) ~[reactor-core-3.4.24.jar:3.4.24]
	at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:279) ~[reactor-netty-core-1.0.24.jar:1.0.24]
	at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:388) ~[reactor-netty-core-1.0.24.jar:1.0.24]
	at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:404) ~[reactor-netty-core-1.0.24.jar:1.0.24]
	at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:113) ~[reactor-netty-core-1.0.24.jar:1.0.24]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.84.Final.jar:4.1.84.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.84.Final.jar:4.1.84.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.84.Final.jar:4.1.84.Final]
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:336) ~[netty-codec-4.1.84.Final.jar:4.1.84.Final]
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:308) ~[netty-codec-4.1.84.Final.jar:4.1.84.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.84.Final.jar:4.1.84.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.84.Final.jar:4.1.84.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.84.Final.jar:4.1.84.Final]
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.84.Final.jar:4.1.84.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) ~[netty-transport-4.1.84.Final.jar:4.1.84.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.84.Final.jar:4.1.84.Final]
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.84.Final.jar:4.1.84.Final]
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[netty-transport-4.1.84.Final.jar:4.1.84.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788) ~[netty-transport-4.1.84.Final.jar:4.1.84.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724) ~[netty-transport-4.1.84.Final.jar:4.1.84.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650) ~[netty-transport-4.1.84.Final.jar:4.1.84.Final]
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562) ~[netty-transport-4.1.84.Final.jar:4.1.84.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) ~[netty-common-4.1.84.Final.jar:4.1.84.Final]
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.84.Final.jar:4.1.84.Final]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.84.Final.jar:4.1.84.Final]
	at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]

Problem here
org.springframework.cloud.sleuth.instrument.tx.TraceReactiveTransactionManager

@Override
	public Mono<ReactiveTransaction> getReactiveTransaction(TransactionDefinition definition)
			throws TransactionException {
		return Mono.deferContextual(contextView -> this.delegate.getReactiveTransaction(definition).map(tx -> {
			Span span = AssertingSpan.continueSpan(SleuthTxSpan.TX_SPAN, SleuthTxSpan.TX_SPAN.wrap(span(contextView)));
			if (tx.isNewTransaction() || span == null) {

AssertingSpan.kt

static AssertingSpan continueSpan(DocumentedSpan documentedSpan, Span span) {
		AssertingSpan assertingSpan = of(documentedSpan, span);
		if (assertingSpan == null) {
			return null;
		}
		((ImmutableAssertingSpan) assertingSpan).isStarted = true;
		return assertingSpan;
	}

What I don't understand is how did a SpanBuilder end up under the Span key in Reactor Context. Can someone create a simple project that replicates this issue? Preferably without Kotlin and with Java and Maven?

If you would like us to look at this issue, please provide the requested information. If the information is not provided within the next 7 days this issue will be closed.

Closing due to lack of requested feedback. If you would like us to look at this issue, please provide the requested information and we will re-open the issue.