atomikos/transactions-essentials

Shard Switch Using spring AbstractRoutingDataSource is not working

ra4java opened this issue · 0 comments

Describe the bug
We have 2 shard databases (postgres instances) with same table in both shards (CustomerDetails table with columns id, name, location )
if customerId hash is 1 then it saves customer details in shard1 (postgres instance 1)
if customerId hash is 2 then it saves customer details in shard2 (postgres instance 2)

We have connection pool with datasources for both shards.
For a getAllCustomers request first we set shard1 as datasource in the ThreadLocal context to fetch the data
then we change the datasource to shard2 in the ThrealLocal context to fetch the data
Spring uses AbstractRoutingDataSource to determineCurrentLookupKey from ThrealLocal context and get the appropriate connection according to the shardId in ThreadLocal

The setup is working fine when we run from Junit but the same setup is not working when we run the spring boot application with default tomcat server.
The issue from spring boot application is that when we began a transaction first it connects to shard1 fetches the data properly in the next step we switched datasource to shard2 in the ThreadLocal context but the transaction is having the connection already established with shard1 and reusing the same connection instead of getting new connection for shard2.
This results in data from only shard1 and not able to fetch the data from shard the case is same for PUT request transaction
So always transaction rollbacks and never commits.
In our case we don't have separate entity manager for each shard we have only one EntityManager as the tables in both the shards are same.

This use-case is working fine in Bitronix currently, Looks like this is a basic use-case, can this be addressed in Atomikos?
Using TransactionsEssentials version: com.atomikos:transactions-spring-boot3-starter:6.0.0

Additional context

@configuration
@dependsOn("transactionManager")
@EnableJpaRepositories(entityManagerFactoryRef = "entityManagerFactory",
basePackages = {"com.example.demo.jpa"})
public class DataSourceConfig {

@Bean("DBProperties")
public Properties dbShardProperties() throws IOException {
	final Resource resource = new FileSystemResource("/opt/database.properties");
	return PropertiesLoaderUtils.loadProperties(resource);
}

	@Bean("dbShardDataSources")
public Map<Object, Object> dbShardDataSources() throws IOException, AtomikosSQLException {
	Properties props = dbShardProperties();
	int shardscount = Integer.parseInt(props.getProperty("shards.count", "5").trim());
	Map<Object, Object> dataSources = new HashMap<>();

	for (int i = 1; i <= shardscount; i++) {
		String url = props.getProperty(String.format("shard.%d.jdbc.url", i)).trim();
		String username = props.getProperty(String.format("shard.%d.username", i)).trim();
		String password = props.getProperty(String.format("shard.%d.password", i)).trim();

		AtomikosDataSourceBean dataSource = new AtomikosDataSourceBean();
		dataSource.setXaDataSourceClassName(PGXADataSource.class.getName());
		dataSource.setUniqueResourceName("jdbc/shard" + i);
		dataSource.setMaxIdleTime("3600"));
		Properties datasourceProps = new Properties();
		datasourceProps.put(PASSWORD, password);
		datasourceProps.setProperty(USER, username);
		datasourceProps.setProperty("url", url);
		dataSource.setXaProperties(datasourceProps);
		dataSource.setMaxPoolSize(15);
		dataSource.setTestQuery(props.getProperty(CONNECTION_TEST_QUERY,
				SELECT_1));
		dataSource.init();
		dataSources.put(i, dataSource);
	}

	return dataSources;
}

@Bean
@Primary
public ShardAwareDataSource dataSource(@Qualifier("dbShardDataSources") Map<Object, Object> dataSources) {
	ShardAwareDataSource resolver = new ShardAwareDataSource();
	resolver.setTargetDataSources(dataSources);
	resolver.setLenientFallback(false);
	return resolver;
}

@Primary
@Bean(name = "entityManagerFactory")
public LocalContainerEntityManagerFactoryBean entityManagerFactory(DataSource dataSource) {
    System.out.println("DSConfig: " + AtomikosJtaPlatform.isTransactionManagerSet());
    LocalContainerEntityManagerFactoryBean em  = new LocalContainerEntityManagerFactoryBean();
    em.setDataSource(dataSource);
    em.setPackagesToScan("com.example.demo.jpa");
    em.setPersistenceUnitName("shardDB");
    HibernateJpaVendorAdapter vendorAdapter = new HibernateJpaVendorAdapter();
    vendorAdapter.setDatabase(Database.POSTGRESQL);
    vendorAdapter.setDatabasePlatform(org.hibernate.dialect.PostgreSQLDialect.class.getName());
    em.setJpaVendorAdapter(vendorAdapter);
    em.setJpaProperties(CBIUtils.additionalProperties());

    return em;
}

}

/*
Class that is used to switch the shardId in the ThreadLocal context
*/
public class ShardAwareDataSource extends AbstractRoutingDataSource {
@OverRide
protected Object determineCurrentLookupKey() {
Object obj = TenantContext.getShardId();
if(obj == null)
obj = 1;
return obj;
}

}