spring-projects/spring-data-examples

The transaction does not seem to take effect in this case

funky-eyes opened this issue · 13 comments

public interface DistributedLockRepository extends ReactiveCrudRepository<DistributedLock, String> {

    @Query("SELECT * FROM distributed_lock WHERE lock_key = :#{[0]} for update ")
    Mono<DistributedLock> findByLockKey(String lockKey);

}
    @Override
    public boolean acquireLock(DistributedLockDO distributedLockDO) {
        try {
            return Boolean.TRUE.equals(distributedLockRepository.findByLockKey(distributedLockDO.getLockKey())
                .publishOn(Schedulers.boundedElastic()).map(distributedLock -> {
                    if (distributedLock != null && StringUtils.isNotBlank(distributedLock.getLockValue())
                        && !StringUtils.equals(distributedLock.getLockValue(), distributedLockDO.getLockValue())
                        && System.currentTimeMillis() < distributedLock.getExpireTime()) {
                        return false;
                    }
                    distributedLockDO.setExpireTime(distributedLockDO.getExpireTime() + System.currentTimeMillis());
                    if (distributedLock != null) {
                        if (!StringUtils.equals(distributedLock.getLockValue(), distributedLockDO.getLockValue())) {
                            distributedLock.setLockValue(distributedLockDO.getLockValue());
                        }
                        distributedLock.setNewLock(false);
                        return distributedLockRepository.save(distributedLock).block() != null;
                    }
                    distributedLock = new DistributedLock();
                    distributedLockDOToEntity.copy(distributedLockDO, distributedLock, null);
                    return distributedLockRepository.save(distributedLock).block() != null;
                }).as(operator::transactional).block());
        } catch (R2dbcDataIntegrityViolationException e) {
            // being scrambled by other threads to succeed
            return false;
        }
    }
Caused by: io.r2dbc.spi.R2dbcTimeoutException: Lock wait timeout exceeded; try restarting transaction
	at dev.miku.r2dbc.mysql.ExceptionFactory.createException(ExceptionFactory.java:69)
	Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Error has been observed at the following site(s):
	*__checkpoint ⇢ SQL "UPDATE distributed_lock SET lock_value = ?, expire = ? WHERE distributed_lock.lock_key = ?" [DatabaseClient]
Original Stack Trace:
		at dev.miku.r2dbc.mysql.ExceptionFactory.createException(ExceptionFactory.java:69)
		at dev.miku.r2dbc.mysql.TextQueryHandler.accept(QueryFlow.java:317)
		at dev.miku.r2dbc.mysql.TextQueryHandler.accept(QueryFlow.java:292)
		at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onNext(FluxHandleFuseable.java:176)
		at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107)
		at dev.miku.r2dbc.mysql.util.DiscardOnCancelSubscriber.onNext(DiscardOnCancelSubscriber.java:70)
		at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onNext(FluxPeekFuseable.java:854)
		at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:250)
		at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:200)
		at reactor.core.publisher.FluxHandle$HandleSubscriber.onNext(FluxHandle.java:126)
		at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onNext(FluxPeekFuseable.java:854)
		at reactor.core.publisher.EmitterProcessor.drain(EmitterProcessor.java:491)
		at reactor.core.publisher.EmitterProcessor.tryEmitNext(EmitterProcessor.java:299)
		at reactor.core.publisher.InternalManySink.emitNext(InternalManySink.java:27)
		at reactor.core.publisher.EmitterProcessor.onNext(EmitterProcessor.java:265)
		at dev.miku.r2dbc.mysql.client.ReactorNettyClient$ResponseSink.next(ReactorNettyClient.java:340)
		at dev.miku.r2dbc.mysql.client.ReactorNettyClient.lambda$new$0(ReactorNettyClient.java:103)
		at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:185)
		at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:279)
		at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:388)
		at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:404)
		at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:93)
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
		at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
		at dev.miku.r2dbc.mysql.client.MessageDuplexCodec.handleDecoded(MessageDuplexCodec.java:187)
		at dev.miku.r2dbc.mysql.client.MessageDuplexCodec.channelRead(MessageDuplexCodec.java:95)
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
		at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
		at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327)
		at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:299)
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
		at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
		at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1372)
		at io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1235)
		at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1284)
		at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:510)
		at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:449)
		at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:279)
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
		at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
		at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
		at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
		at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
		at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
		at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
		at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
		at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
		at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:995)
		at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
		at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
		at java.base/java.lang.Thread.run(Thread.java:829)
<==

When using the above code, the transaction will fail, and querying the data first and then updating it will fail due to the inability to get an x-lock on the database

UPDATE distributed_lock SET lock_value = ?, expire = ? WHERE distributed_lock.lock_key = ? doesn't seem to use database-specific locks. Lock wait timeout exceeded; try restarting transaction indicates that another transaction is ongoing and that the current connection didn't get a lock to update the row.

Have you checked what transaction is holding the lock?

UPDATE distributed_lock SET lock_value = ?, expire = ? WHERE distributed_lock.lock_key = ? doesn't seem to use database-specific locks. Lock wait timeout exceeded; try restarting transaction indicates that another transaction is ongoing and that the current connection didn't get a lock to update the row.

Have you checked what transaction is holding the lock?

distributedLockRepository.findByLockKey(distributedLockDO.getLockKey()) The lock is held, which means that his transaction is not maintained to the update sql block,or it's not the same transaction as the update transaction

'for update' and 'update' are not connected in a transaction, which causes 'update' to time out while waiting for an x-lock on the database, how do I get 'select for update' and 'update' to stay in the same transaction? @mp911de

Make sure that you have a single connection factory to obtain connections from. Any routing might cause interferences.

Make sure that you have a single connection factory to obtain connections from. Any routing might cause interferences.

You can try this example, the transaction is invalid

@ConditionalOnBean(DatabaseClient.class)
@Component
public class R2dbcDistributedLockerDAO implements DistributedLocker {
    private static final Logger LOGGER = LoggerFactory.getLogger(R2dbcDistributedLockerDAO.class);

    private final BeanCopier distributedLockDOToEntity = BeanCopier.create(DistributedLockDO.class, DistributedLock.class, false);

    @Resource
    private DistributedLockRepository distributedLockRepository;

    @Resource
    private TransactionalOperator operator;

    /**
     * Instantiates a new Log store data base dao.
     */
    public R2dbcDistributedLockerDAO() {
    }

    @Override
    public boolean acquireLock(DistributedLockDO distributedLockDO) {
        try {
            return Boolean.TRUE.equals(distributedLockRepository.findByLockKey(distributedLockDO.getLockKey())
                .publishOn(Schedulers.boundedElastic()).map(distributedLock -> {
                    if (distributedLock != null && StringUtils.isNotBlank(distributedLock.getLockValue())
                        && !StringUtils.equals(distributedLock.getLockValue(), distributedLockDO.getLockValue())
                        && System.currentTimeMillis() < distributedLock.getExpireTime()) {
                        return false;
                    }
                    distributedLockDO.setExpireTime(distributedLockDO.getExpireTime() + System.currentTimeMillis());
                    if (distributedLock != null) {
                        if (!StringUtils.equals(distributedLock.getLockValue(), distributedLockDO.getLockValue())) {
                            distributedLock.setLockValue(distributedLockDO.getLockValue());
                        }
                        distributedLock.setNewLock(false);
                        return distributedLockRepository.save(distributedLock).block() != null;
                    }
                    distributedLock = new DistributedLock();
                    distributedLockDOToEntity.copy(distributedLockDO, distributedLock, null);
                    return distributedLockRepository.save(distributedLock).block() != null;
                }).as(operator::transactional).block());
        } catch (R2dbcDataIntegrityViolationException e) {
            // being scrambled by other threads to succeed
            return false;
        }
    }
}
@SpringBootApplication( exclude = R2dbcAutoConfiguration.class)
public class ServerApplication {
    public static void main(String[] args) throws IOException {
        // run the spring-boot application
        SpringApplication.run(ServerApplication.class, args);
    }
}
@Configuration
@Import(R2dbcDataAutoConfiguration.class)
public class R2dbcAutoConfiguration {
}
@Configuration
@EnableConfigurationProperties(R2dbcProperties.class)
@AutoConfigureBefore(R2dbcAutoConfiguration.class)
public class R2dbcConfiguration extends AbstractDataSourceProvider {

    @Bean
    public DatabaseClient databaseClient(ConnectionFactory connectionFactory) {
        R2dbcDialect dialect = DialectResolver.getDialect(connectionFactory);
        return DatabaseClient.builder().connectionFactory(connectionFactory)
            .bindMarkers(dialect.getBindMarkersFactory()).build();
    }

    @Bean
    public ReactiveTransactionManager reactiveTransactionManager(ConnectionFactory connectionFactory) {
        return new R2dbcTransactionManager(connectionFactory);
    }

    @Bean
    public R2dbcEntityTemplate r2dbcEntityTemplate(DatabaseClient databaseClient) {
        R2dbcDialect dialect = DialectResolver.getDialect(databaseClient.getConnectionFactory());
        return new R2dbcEntityTemplate(databaseClient, dialect);
    }

    @Bean
    public ConnectionPool connectionFactory(R2dbcProperties r2dbcProperties) {
        String url = getUrl();
        ConnectionInfo connectionInfo = URLParser.parser(url);
        String[] dbPeer = connectionInfo.getDbPeer().split(":");
        String host = dbPeer[0];
        int port = Integer.parseInt(dbPeer[1]);
        ConnectionFactoryOptions.Builder options = ConnectionFactoryOptions.builder()
            .option(DRIVER, getDBType().name().toLowerCase()).option(HOST, host).option(USER, getUser())
            .option(PORT, port).option(PASSWORD, getPassword()).option(DATABASE, connectionInfo.getDbInstance())
            .option(CONNECT_TIMEOUT, Duration.ofMillis(getMaxWait()));
        String paramUrl = url.substring(url.indexOf("?") + 1);
        if (StringUtils.isNotBlank(paramUrl)) {
            String useSSL = "useSSL";
            if (paramUrl.contains(useSSL)) {
                String[] params = paramUrl.split("&");
                for (String param : params) {
                    if (param.contains(useSSL)) {
                        options.option(SSL, Boolean.parseBoolean(param.split("=")[1]));
                        break;
                    }
                }
            }
        }
        ConnectionFactory connectionFactory = ConnectionFactories.get(options.build());
        R2dbcProperties.Pool pool = r2dbcProperties.getPool();
        PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
        ConnectionPoolConfiguration.Builder builder = ConnectionPoolConfiguration.builder(connectionFactory);
        map.from(Duration.ofMillis(getMaxWait())).to(builder::maxIdleTime);
        map.from(pool.getMaxAcquireTime()).to(builder::maxAcquireTime);
        map.from(pool.getMaxCreateConnectionTime()).to(builder::maxCreateConnectionTime);
        map.from(getMinConn()).to(builder::initialSize);
        map.from(getMaxConn()).to(builder::maxSize);
        map.from(pool.getValidationQuery()).whenHasText().to(builder::validationQuery);
        map.from(pool.getValidationDepth()).to(builder::validationDepth);
        return new ConnectionPool(builder.build());
    }

    @Bean
    public R2dbcMappingContext r2dbcMappingContext(ObjectProvider<NamingStrategy> namingStrategy,
        R2dbcCustomConversions r2dbcCustomConversions) {
        R2dbcMappingContext relationalMappingContext =
            new R2dbcMappingContext(namingStrategy.getIfAvailable(() -> NamingStrategy.INSTANCE));
        relationalMappingContext.setSimpleTypeHolder(r2dbcCustomConversions.getSimpleTypeHolder());
        return relationalMappingContext;
    }

    @Bean
    public MappingR2dbcConverter r2dbcConverter(R2dbcMappingContext mappingContext,
        R2dbcCustomConversions r2dbcCustomConversions) {
        return new MappingR2dbcConverter(mappingContext, r2dbcCustomConversions);
    }

    @Bean
    public R2dbcCustomConversions r2dbcCustomConversions(ConnectionFactory connectionFactory) {
        R2dbcDialect dialect = DialectResolver.getDialect(connectionFactory);
        List<Object> converters = new ArrayList<>(dialect.getConverters());
        converters.addAll(R2dbcCustomConversions.STORE_CONVERTERS);
        return new R2dbcCustomConversions(
            CustomConversions.StoreConversions.of(dialect.getSimpleTypeHolder(), converters), Collections.emptyList());
    }
}

apache/incubator-seata#4926

I changed the code to the following and the transaction took effect, I will continue to watch and learn tomorrow, thanks for your help @mp911de

    @Transactional
    public Mono<Boolean> acquireLock(DistributedLockDO distributedLockDO) {
        try {
            return Mono.from(connectionFactory.create()).flatMap(connection -> Mono.from(connection.beginTransaction())
                .then(distributedLockRepository.findByLockKey(distributedLockDO.getLockKey())).map(distributedLock -> {
                    if (distributedLock != null && StringUtils.isNotBlank(distributedLock.getLockValue())
                        && !StringUtils.equals(distributedLock.getLockValue(), distributedLockDO.getLockValue())
                        && System.currentTimeMillis() < distributedLock.getExpireTime()) {
                        return Mono.just(false);
                    }
                    distributedLockDO.setExpireTime(distributedLockDO.getExpireTime() + System.currentTimeMillis());
                    if (distributedLock != null) {
                        if (!StringUtils.equals(distributedLock.getLockValue(), distributedLockDO.getLockValue())) {
                            distributedLock.setLockValue(distributedLockDO.getLockValue());
                        }
                        distributedLock.setNewLock(false);
                        return distributedLockRepository.save(distributedLock).then(Mono.just(true));
                    }
                    distributedLock = new DistributedLock();
                    distributedLockDOToEntity.copy(distributedLockDO, distributedLock, null);
                    return distributedLockRepository.save(distributedLock).then(Mono.just(true));
                }).flatMap(Mono::from)
                .delayUntil(bool -> bool ? connection.commitTransaction() : connection.rollbackTransaction())
                .doFinally(c -> connection.close()));
        } catch (R2dbcDataIntegrityViolationException e) {
            // being scrambled by other threads to succeed
            return Mono.just(false);
        }
    }

Changing the above code to use TransactionalOperator will invalidate the transaction

            return Boolean.TRUE
                .equals(distributedLockRepository.findByLockKey(distributedLockDO.getLockKey()).map(distributedLock -> {
                    if (distributedLock != null && StringUtils.isNotBlank(distributedLock.getLockValue())
                        && !StringUtils.equals(distributedLock.getLockValue(), distributedLockDO.getLockValue())
                        && System.currentTimeMillis() < distributedLock.getExpireTime()) {
                        return Mono.just(false);
                    }
                    distributedLockDO.setExpireTime(distributedLockDO.getExpireTime() + System.currentTimeMillis());
                    if (distributedLock != null) {
                        if (!StringUtils.equals(distributedLock.getLockValue(), distributedLockDO.getLockValue())) {
                            distributedLock.setLockValue(distributedLockDO.getLockValue());
                        }
                        distributedLock.setNewLock(false);
                        return distributedLockRepository.save(distributedLock).then(Mono.just(true));
                    }
                    distributedLock = new DistributedLock();
                    distributedLockDOToEntity.copy(distributedLockDO, distributedLock, null);
                    return distributedLockRepository.save(distributedLock).then(Mono.just(true));
                }).flatMap(Mono::from).as(operator::transactional).block());
 Lock wait timeout exceeded; try restarting transaction
	at dev.miku.r2dbc.mysql.ExceptionFactory.createException(ExceptionFactory.java:69)
	Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Error has been observed at the following site(s):
	*__checkpoint ⇢ SQL "UPDATE distributed_lock SET lock_value = ?, expire = ? WHERE distributed_lock.lock_key = ?" [DatabaseClient]
Original Stack Trace:
		at dev.miku.r2dbc.mysql.ExceptionFactory.createException(ExceptionFactory.java:69)
		at dev.miku.r2dbc.mysql.TextQueryHandler.accept(QueryFlow.java:317)
		at dev.miku.r2dbc.mysql.TextQueryHandler.accept(QueryFlow.java:292)
		at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onNext(FluxHandleFuseable.java:176)
		at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107)

The 2 example transactions above both fail except that they no longer output 'Lock wait timeout exceeded'
Can you give me an example of 'select for update ' combined with ' update/save'?
I need the for update to hold the x lock on the database to ensure that my update is correct
@mp911de

Can anyone tell me what to do for this application scenario

help

return distributedLockRepository.save(distributedLock).block() does not participate within a transaction because there's no context propagation. I also wonder why you use R2DBC if your calling code is blocking. Either rewrite everything to return Mono/Flux or use JDBC. Never call .block as that is the source of your transaction context issues.

return distributedLockRepository.save(distributedLock).block() does not participate within a transaction because there's no context propagation. I also wonder why you use R2DBC if your calling code is blocking. Either rewrite everything to return Mono/Flux or use JDBC. Never call .block as that is the source of your transaction context issues.

In a transaction, there is inherently the possibility of relying on query results, so how can you ensure that data is not read dirty if it is not in the same transaction? Can you tell me how to have both query and insert actions in a single transaction in r2dbc?