spring-projects/spring-data-r2dbc

Spring Data R2DBC ReactiveCrudRepository.save() method is not working even with method / class level @Transactional annotation

daggerok opened this issue · 14 comments

Hello,

Please, let me know if spring-data-r2dbc transactions by default are read only?
For some reasons, data cannot be saved when I do it using ReactiveCrudRepository like so:

    @Transactional
    @ResponseBody
    @PostMapping("/sessions")
    public Mono<Session> save(@RequestBody Session session) {
        return Mono.just(session)
                   .filter(s -> Objects.nonNull(s.getName()))
                   .filter(s -> Objects.nonNull(s.getSpeakers()))
                   .map(s -> Objects.isNull(s.getId()) ? s.setId(UUID.randomUUID()) : s)
                   .flatMap(sessionRepository::save);
    }

I have prepared a repo, and that par is present, but commented here. Cannot understand, why it cannot be committed... but anyway, data can be successfully saved in database if I'm using DatabaseClient for that:

    @ResponseBody
    @PostMapping("/sessions")
    public Mono<Integer> save(@RequestBody Session session) {
        return Mono.just(session)
                   .filter(s -> Objects.nonNull(s.getName()))
                   .filter(s -> Objects.nonNull(s.getSpeakers()))
                   .map(s -> Objects.isNull(s.getId()) ? s.setId(UUID.randomUUID()) : s)
                   .flatMap(s -> client.execute("INSERT INTO sessions (id, name, speakers) VALUES ($1, $2, $3)")
                                       .bind("$1", s.getId())
                                       .bind("$2", s.getName())
                                       .bind("$3", s.getSpeakers())
                                       .fetch().rowsUpdated());
    }

full code located here

versions

  • spring-boot 2.2.0.RELEASE
  • spring-boot-starter-data-r2dbc: 0.1.0.BUILD-SNAPSHOT
  • r2dbc-postgresql: 0.8.0.RC2

Is it bug or I missed something?
Thanks!


Regards,
Maksim

R2DBC transactions use regular database transactions underneath.

or some reasons, data cannot be saved when I do it using ReactiveCrudRepository like so:

Care to elaborate on cannot? Do you see an error or what is the result of an action invoking save?

Hello @mp911de ,
No errors. Everything is seems like saved and I can receive response successfully, but in fact, db has no new record and next get query will respond just like nothing has been saved.

I also tried this:

    @Inject ReactiveTransactionManager rtm;

    /**
     * Also doesn't work...
     */
    @ResponseBody
    @Transactional
    @PostMapping("/sessions")
    public Mono<Void> save(@RequestBody Session session) {
        TransactionalOperator rtx = TransactionalOperator.create(rtm);
        return rtx.execute(status -> Mono.just(session)
                                         .filter(s -> Objects.nonNull(s.getName()))
                                         .filter(s -> Objects.nonNull(s.getSpeakers()))
                                         .map(s -> Objects.isNull(s.getId()) ? s.setId(UUID.randomUUID()) : s)
                                         .flatMap(sessionRepository::save)
                                         .then())
                  .then();
    }

and got "same success" result in response...
And only when I changed my post endpoint to this one:

    /**
     * Failed with: NoSuchBeanDefinitionException:
     * No qualifying bean of type 'org.springframework.transaction.PlatformTransactionManager' available
     */
    @ResponseBody
    @Transactional
    @PostMapping("/sessions")
    public Disposable save(@RequestBody Session session) {
        return Mono.just(session)
                   .filter(s -> Objects.nonNull(s.getName()))
                   .filter(s -> Objects.nonNull(s.getSpeakers()))
                   .map(s -> Objects.isNull(s.getId()) ? s.setId(UUID.randomUUID()) : s)
                   .flatMap(sessionRepository::save)
                   .subscribe(log::info);
    }

ie subscribing on result after save in a pipeline processing flow and returning disposable, I can get error related to not configured PlatformTransactionManager bean:

POST http://127.0.0.1:8087/sessions

HTTP/1.1 500 Internal Server Error
Content-Type: application/json
Content-Length: 241

{
  "timestamp": "2019-10-23T23:32:01.798+0000",
  "path": "/sessions",
  "status": 500,
  "error": "Internal Server Error",
  "message": "No qualifying bean of type 'org.springframework.transaction.PlatformTransactionManager' available",
  "requestId": "3b9cd0fe"
}

server logs:

2019-10-24 02:32:01.806 ERROR 84269 --- [ctor-http-nio-2] a.w.r.e.AbstractErrorWebExceptionHandler : [3b9cd0fe] 500 Server Error for HTTP POST "/sessions"

org.springframework.beans.factory.NoSuchBeanDefinitionException: No qualifying bean of type 'org.springframework.transaction.PlatformTransactionManager' available
	at org.springframework.beans.factory.support.DefaultListableBeanFactory.getBean(DefaultListableBeanFactory.java:351) ~[spring-beans-5.2.0.RELEASE.jar:5.2.0.RELEASE]
	Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Error has been observed at the following site(s):
	|_ checkpoint ⇢ org.springframework.boot.actuate.metrics.web.reactive.server.MetricsWebFilter [DefaultWebFilterChain]
	|_ checkpoint ⇢ HTTP POST "/sessions" [ExceptionHandlingWebHandler]
Stack trace:
		at org.springframework.beans.factory.support.DefaultListableBeanFactory.getBean(DefaultListableBeanFactory.java:351) ~[spring-beans-5.2.0.RELEASE.jar:5.2.0.RELEASE]
		at org.springframework.beans.factory.support.DefaultListableBeanFactory.getBean(DefaultListableBeanFactory.java:342) ~[spring-beans-5.2.0.RELEASE.jar:5.2.0.RELEASE]
		at org.springframework.transaction.interceptor.TransactionAspectSupport.determineTransactionManager(TransactionAspectSupport.java:468) ~[spring-tx-5.2.0.RELEASE.jar:5.2.0.RELEASE]
		at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:342) ~[spring-tx-5.2.0.RELEASE.jar:5.2.0.RELEASE]
		at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:99) ~[spring-tx-5.2.0.RELEASE.jar:5.2.0.RELEASE]
		at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.2.0.RELEASE.jar:5.2.0.RELEASE]
		at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:747) ~[spring-aop-5.2.0.RELEASE.jar:5.2.0.RELEASE]
		at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:689) ~[spring-aop-5.2.0.RELEASE.jar:5.2.0.RELEASE]
		at com.github.daggerok.hero.rsocket.RSocketSessions$$EnhancerBySpringCGLIB$$978e4fe7.save(<generated>) ~[classes/:na]
		at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_222]
		at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_222]
		at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_222]
		at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_222]
		at org.springframework.web.reactive.result.method.InvocableHandlerMethod.lambda$invoke$0(InvocableHandlerMethod.java:146) ~[spring-webflux-5.2.0.RELEASE.jar:5.2.0.RELEASE]
		at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:118) ~[reactor-core-3.3.0.RELEASE.jar:3.3.0.RELEASE]
		at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1592) ~[reactor-core-3.3.0.RELEASE.jar:3.3.0.RELEASE]
		at reactor.core.publisher.MonoZip$ZipCoordinator.signal(MonoZip.java:247) ~[reactor-core-3.3.0.RELEASE.jar:3.3.0.RELEASE]
		at reactor.core.publisher.MonoZip$ZipInner.onNext(MonoZip.java:329) ~[reactor-core-3.3.0.RELEASE.jar:3.3.0.RELEASE]
		at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onNext(MonoPeekTerminal.java:173) ~[reactor-core-3.3.0.RELEASE.jar:3.3.0.RELEASE]
		at reactor.core.publisher.FluxDefaultIfEmpty$DefaultIfEmptySubscriber.onNext(FluxDefaultIfEmpty.java:92) ~[reactor-core-3.3.0.RELEASE.jar:3.3.0.RELEASE]
		at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:67) ~[reactor-core-3.3.0.RELEASE.jar:3.3.0.RELEASE]
		at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:73) ~[reactor-core-3.3.0.RELEASE.jar:3.3.0.RELEASE]
		at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:121) ~[reactor-core-3.3.0.RELEASE.jar:3.3.0.RELEASE]
		at reactor.core.publisher.FluxContextStart$ContextStartSubscriber.onNext(FluxContextStart.java:103) ~[reactor-core-3.3.0.RELEASE.jar:3.3.0.RELEASE]
		at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:287) ~[reactor-core-3.3.0.RELEASE.jar:3.3.0.RELEASE]
		at reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onNext(FluxFilterFuseable.java:330) ~[reactor-core-3.3.0.RELEASE.jar:3.3.0.RELEASE]
		at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1592) ~[reactor-core-3.3.0.RELEASE.jar:3.3.0.RELEASE]
		at reactor.core.publisher.MonoCollectList$MonoCollectListSubscriber.onComplete(MonoCollectList.java:121) ~[reactor-core-3.3.0.RELEASE.jar:3.3.0.RELEASE]
		at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:136) ~[reactor-core-3.3.0.RELEASE.jar:3.3.0.RELEASE]
		at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:252) ~[reactor-core-3.3.0.RELEASE.jar:3.3.0.RELEASE]
		at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:136) ~[reactor-core-3.3.0.RELEASE.jar:3.3.0.RELEASE]
		at reactor.netty.channel.FluxReceive.terminateReceiver(FluxReceive.java:397) ~[reactor-netty-0.9.0.RELEASE.jar:0.9.0.RELEASE]
		at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:197) ~[reactor-netty-0.9.0.RELEASE.jar:0.9.0.RELEASE]
		at reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:345) ~[reactor-netty-0.9.0.RELEASE.jar:0.9.0.RELEASE]
		at reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:363) ~[reactor-netty-0.9.0.RELEASE.jar:0.9.0.RELEASE]
		at reactor.netty.http.server.HttpServerOperations.onInboundNext(HttpServerOperations.java:453) ~[reactor-netty-0.9.0.RELEASE.jar:0.9.0.RELEASE]
		at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:91) ~[reactor-netty-0.9.0.RELEASE.jar:0.9.0.RELEASE]
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) ~[netty-transport-4.1.42.Final.jar:4.1.42.Final]
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) ~[netty-transport-4.1.42.Final.jar:4.1.42.Final]
		at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352) ~[netty-transport-4.1.42.Final.jar:4.1.42.Final]
		at reactor.netty.http.server.HttpTrafficHandler.channelRead(HttpTrafficHandler.java:208) ~[reactor-netty-0.9.0.RELEASE.jar:0.9.0.RELEASE]
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) ~[netty-transport-4.1.42.Final.jar:4.1.42.Final]
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) ~[netty-transport-4.1.42.Final.jar:4.1.42.Final]
		at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352) ~[netty-transport-4.1.42.Final.jar:4.1.42.Final]
		at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:438) ~[netty-transport-4.1.42.Final.jar:4.1.42.Final]
		at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:328) ~[netty-codec-4.1.42.Final.jar:4.1.42.Final]
		at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:302) ~[netty-codec-4.1.42.Final.jar:4.1.42.Final]
		at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:253) ~[netty-transport-4.1.42.Final.jar:4.1.42.Final]
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) ~[netty-transport-4.1.42.Final.jar:4.1.42.Final]
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) ~[netty-transport-4.1.42.Final.jar:4.1.42.Final]
		at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352) ~[netty-transport-4.1.42.Final.jar:4.1.42.Final]
		at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1422) ~[netty-transport-4.1.42.Final.jar:4.1.42.Final]
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) ~[netty-transport-4.1.42.Final.jar:4.1.42.Final]
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) ~[netty-transport-4.1.42.Final.jar:4.1.42.Final]
		at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:931) ~[netty-transport-4.1.42.Final.jar:4.1.42.Final]
		at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163) ~[netty-transport-4.1.42.Final.jar:4.1.42.Final]
		at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:700) ~[netty-transport-4.1.42.Final.jar:4.1.42.Final]
		at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:635) ~[netty-transport-4.1.42.Final.jar:4.1.42.Final]
		at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:552) ~[netty-transport-4.1.42.Final.jar:4.1.42.Final]
		at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:514) ~[netty-transport-4.1.42.Final.jar:4.1.42.Final]
		at io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1044) ~[netty-common-4.1.42.Final.jar:4.1.42.Final]
		at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.42.Final.jar:4.1.42.Final]
		at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.42.Final.jar:4.1.42.Final]
		at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_222]

don't know if it helps to understand what is missing...

Thanks

Thanks a lot. Calling R2dbcRepository.save(…) with an object that has an Id assigned issues an update. Without further hints, we don't have a way to know whether the object is new or whether it already exists. Let your Session object implement Persistable.

Reactive transactions work only on methods returning Publisher types (Mono, Flux, Publisher, Flowable). Disposable isn't qualified as reactive type as there's no way how to associate it with a Reactor Context therefore the transaction aspect doesn't use the reactive transaction approach and falls back to PlatformTransactionManager.

Hello @mp911de
Thanks for help!
I will refactor my Session Pojo to implement Persistable<UUID> and change edpoint to return normal publisher and let you know if it will help me solve a problem


Regards

Hello again!

Your recommendation does helped me to save my Session POJO, awesome!

    @ResponseBody
    @PostMapping("/sessions")
    public Mono<Session> save(@RequestBody Session session) {
        return Mono.just(session)
                   .filter(s -> Objects.nonNull(s.getName()))
                   .filter(s -> Objects.nonNull(s.getSpeakers()))
                   .flatMap(sessionRepository::save);
    }

Anyway, one problem is still left for me:

executeMany; SQL [INSERT INTO sessions (name, speakers) VALUES ($1, $2)]; null value in
column \"id\" violates not-null constraint; nested exception is
io.r2dbc.postgresql.ExceptionFactory$PostgresqlDataIntegrityViolationException: [23502]
null value in column \"id\" violates not-null constraint

This is happening if my id UUID doesn't provided (because otherwise it wont be result to true in isNew method anymore..

At the moment I can see two choices here:

  1. I could tweak a little bit isNew method in my Session POJO, like so:
@Data
@Table("sessions")
@NoArgsConstructor
public class Session implements Persistable<UUID> {

    @Id
    private UUID id;
    private String name, speakers;

    @Override
    public boolean isNew() {
        boolean result = Objects.isNull(id);
        this.id = result ? UUID.randomUUID() : this.id;
        return result;
    }
}
  1. Or I MUST relay on postgres UUID auto generation functionality in my DDL scripts, like so:
CREATE EXTENSION IF NOT EXISTS "uuid-ossp"; -- required by uuid_generate_v1

DROP TABLE IF EXISTS sessions;
CREATE TABLE sessions (
    -- id UUID PRIMARY KEY,
    id UUID NOT NULL DEFAULT uuid_generate_v1(),
    name VARCHAR NOT NULL,
    speakers VARCHAR NOT NULL,
    CONSTRAINT sessions_pk PRIMARY KEY ( id )
);

@mp911de, please, let me know if you have better workaround in your mind, or maybe I can implement some UUID generation mechanism using spring functionality?

Thank you!


Regards,
Maksim

There's nothing wrong in assigning an Id if the database does not generate it for you. If you're using repositories, then the implementation with Persistable is doing the job to indicate whether the object is new or whether the object should already exist.

In public Mono<Session> save(@RequestBody Session session) { you know, that you want to insert the object so you might want to use a @Transient property to keep a flag whether your object is new.

Closing this ticket as there is no general issue with Spring Data R2DBC.

@mp911de, thanks for help

Hello again, @mp911de
I cannot find any related to Persistable interface information in documentation... Can you point me if its located somewhere? Or if its missing maybe I could implement some documentation PR?
Recently oher my colleagues was facing with exact same problem, so I think we must improve here

I had this exact same issue! Thank you so much for sharing @daggerok and @mp911de ... I was wondering if one of you can explain to me why we do .flatMap(sessionRepository::save); and you can't just do a sessionRepository.save(session); inside a map? Why is it flat map? I just want to be able to fully explain my code.

@JesseRatt

You should use flatMap because sessionRepository.save() returns Mono<Session>, not Session

So to pipe correctly save result to next map operation as Session, not Mono<Session> you should flatMap it

Hello again!

Your recommendation does helped me to save my Session POJO, awesome!

    @ResponseBody
    @PostMapping("/sessions")
    public Mono<Session> save(@RequestBody Session session) {
        return Mono.just(session)
                   .filter(s -> Objects.nonNull(s.getName()))
                   .filter(s -> Objects.nonNull(s.getSpeakers()))
                   .flatMap(sessionRepository::save);
    }

Anyway, one problem is still left for me:

executeMany; SQL [INSERT INTO sessions (name, speakers) VALUES ($1, $2)]; null value in
column \"id\" violates not-null constraint; nested exception is
io.r2dbc.postgresql.ExceptionFactory$PostgresqlDataIntegrityViolationException: [23502]
null value in column \"id\" violates not-null constraint

This is happening if my id UUID doesn't provided (because otherwise it wont be result to true in isNew method anymore..

At the moment I can see two choices here:

1. I could tweak a little bit `isNew` method in my Session POJO, like so:
@Data
@Table("sessions")
@NoArgsConstructor
public class Session implements Persistable<UUID> {

    @Id
    private UUID id;
    private String name, speakers;

    @Override
    public boolean isNew() {
        boolean result = Objects.isNull(id);
        this.id = result ? UUID.randomUUID() : this.id;
        return result;
    }
}
2. Or I MUST relay on postgres UUID auto generation functionality in my DDL scripts, like so:
CREATE EXTENSION IF NOT EXISTS "uuid-ossp"; -- required by uuid_generate_v1

DROP TABLE IF EXISTS sessions;
CREATE TABLE sessions (
    -- id UUID PRIMARY KEY,
    id UUID NOT NULL DEFAULT uuid_generate_v1(),
    name VARCHAR NOT NULL,
    speakers VARCHAR NOT NULL,
    CONSTRAINT sessions_pk PRIMARY KEY ( id )
);

@mp911de, please, let me know if you have better workaround in your mind, or maybe I can implement some UUID generation mechanism using spring functionality?

Thank you!

Regards, Maksim

Fantastic, this worked. Thank you very much.

@Data
@Table("sessions")
@NoArgsConstructor
public class Session implements Persistable<UUID> {

    @Id
    private UUID id;
    private String name, speakers;

    @Override
    public boolean isNew() {
        boolean result = Objects.isNull(id);
        this.id = result ? UUID.randomUUID() : this.id;
        return result;
    }
}

How do I handle the case where I have a composite primary key, let's say, a combination of id and name?

@xbdhshshs Please follow up on Stack Overflow. As mentioned in the guidelines for contributing, we prefer to use GitHub issues only for bugs and enhancements.