Activiti工作流及同Spring boot生态结合的预研Demo。
https://medium.com/@igorkosandyak/spring-boot-with-hazelcast-b04d13927745
https://docs.hazelcast.org/docs/latest/manual/html-single/
https://www.baeldung.com/java-hazelcast
使用Spring的项目初始化工具(https://start.spring.io/)生成初始项目模板。
主要选择了: Spring Web、JPA、Cache、H2。
详细可以参照HELP.md。
<dependency> <!-- Hazelcat核心包依赖 -->
<groupId>com.hazelcast</groupId>
<artifactId>hazelcast</artifactId>
</dependency>
<dependency> <!-- Hazelcat客户端包依赖 -->
<groupId>com.hazelcast</groupId>
<artifactId>hazelcast-client</artifactId>
</dependency>
<dependency> <!-- Hazelcat同Spring集成包依赖 -->
<groupId>com.hazelcast</groupId>
<artifactId>hazelcast-spring</artifactId>
</dependency>
<dependency> <!-- Hazelcat支持Hibernate包依赖 -->
<groupId>com.hazelcast</groupId>
<artifactId>hazelcast-hibernate53</artifactId>
<version>1.3.2</version>
</dependency>
使用Docker镜像运行管理中心:
docker run -p 8080:8080 hazelcast/management-center
根据Hazelcast在项目中的实践,抽象出Hazelcast常用参数,提炼为HazelcastServerProperties、HazelcastClientProperties和HazelcastProperties使用Spring参数进行配置:
hazelcast:
pocServer01: # hazelcast服务器配置变量名字,支持定义多个
instance_name: pocServer01 # 缓存实例名称,内存标识用
group_name: pocHazelcastCache01 # 缓存网络Group名称
port: 55100 # 服务侦听端口
# interfaces: 10.10.1.*, 10.3.10.4-18 # 多网卡时候,标注服务侦听的网卡地址
# members: 10.10.1.2:55100, server-03:55100 # 集群部署各个节点地址,建议使用域名配合底层部署
management_center_enabled: true #接入管理中心的参数
management_center_url: http://localhost:8080/hazelcast-mancenter
management_center_update_interval: 5
pocClient01: # hazelcast客户端配置变量名字,支持定义多个
instance_name: pocClient01 # 客户端缓存实例名称,内存标识用
group_name: pocHazelcastCache01 # 缓存网络Group名称,同服务端保持一致,标识客户端接入服务端的标识
server_address: 127.0.0.1:55100 #多个地址,支持逗号分隔
通过上面设置的服务器参数,构建HazelcastServerConfiguration实现服务器的动态配置。
根据项目需要,可以支持配置多个服务器缓存实例(实际场景应用需求不大)。通常一个应用服务只配置一个缓存服务。
package com.github.dragonetail.hazelcast.config;
import com.github.dragonetail.hazelcast.common.hazelcast.HazelcastServerProperties;
import com.hazelcast.config.Config;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cache.CacheManager;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class HazelcastServerConfiguration {
@Autowired
private HazelcastProperties hazelcastProperties;
@Bean
public CacheManager cacheManager() {
final CacheManager cacheManager =
new com.hazelcast.spring.cache.HazelcastCacheManager(this.hazelcastInstance());
return cacheManager;
}
@Bean(name = "hazelcastInstance", destroyMethod = "shutdown")
public HazelcastInstance hazelcastInstance() {
final Config config = new Config();
HazelcastServerProperties pocServer01 = this.hazelcastProperties.getPocServer01();
config.setInstanceName(pocServer01.getInstanceName());
if (pocServer01.isManagementCenterEnabled()) {
config.getManagementCenterConfig()
.setEnabled(pocServer01.isManagementCenterEnabled())
.setUpdateInterval(pocServer01.getManagementCenterUpdateInterval())
.setUrl(pocServer01.getManagementCenterUrl());
}
config.setNetworkConfig(pocServer01.buildNetworkConfig());
config.getGroupConfig()
.setName(pocServer01.getGroupName());
//.setGroupPassword(pocServer01.getGroupPassword());
return Hazelcast.newHazelcastInstance(config);
}
}
通常情况,一个Spring的应用服务只部署一个缓存服务。
上面配置定义的缓存实例同时作为Spring全局的Cache实现,上面的CacheManager显示地声明了这个配置。
@Bean
public CacheManager cacheManager() {
final CacheManager cacheManager =
new com.hazelcast.spring.cache.HazelcastCacheManager(this.hazelcastInstance());
return cacheManager;
}
通过配置Hibernate的Spring JPA参数,可以明确声明使用Hazelcast作为Hibernate的二级缓存实现。
spring:
application:
name: employee-service
datasource:
url: jdbc:h2:mem:example-app;DB_CLOSE_DELAY=-1;DB_CLOSE_ON_EXIT=FALSE
username: sa
password: h2
driverClassName: org.h2.Driver
jpa:
database-platform: org.hibernate.dialect.H2Dialect
hibernate:
ddl-auto: update
properties:
hibernate:
show_sql: true
format_sql: true
use_sql_comments: true
cache:
use_query_cache: true
use_second_level_cache: true
hazelcast:
instance_name: pocServer01 # 内存中查找对应的Hazelcast实例的名称
# native_client_instance_name: pocClient01 # 客户端的有情况,内存中对应实例名称
region:
factory_class: com.hazelcast.hibernate.HazelcastCacheRegionFactory # Hibernate实体对应缓存分区构造方法,可以选择使用HazelcastLocalCacheRegionFactory,详细参见后文解释
上面配置定义的缓存实例同时作为Spring全局的Cache实现,上面的CacheManager显示地声明了这个配置。
package com.github.dragonetail.hazelcast.config;
import com.github.dragonetail.hazelcast.common.hazelcast.HazelcastClientProperties;
import com.hazelcast.client.HazelcastClient;
import com.hazelcast.client.config.ClientConfig;
import com.hazelcast.core.HazelcastInstance;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.DependsOn;
@Configuration
public class HazelcastClientConfiguration {
@Autowired
private HazelcastProperties hazelcastProperties;
@Bean(name = "pocClient01HazelcastInstance", destroyMethod = "shutdown")
@DependsOn("hazelcastInstance")
public HazelcastInstance pocClient01HazelcastInstance() {
HazelcastClientProperties pocClient01 = this.hazelcastProperties.getPocClient01();
final ClientConfig config = new ClientConfig();
config.setInstanceName(pocClient01.getInstanceName());
config.setNetworkConfig(pocClient01.buildNetworkConfig());
config.getGroupConfig()//
.setName(pocClient01.getGroupName());
//.setGroupPassword(pocClient01.getGroupPassword());
return HazelcastClient.newHazelcastClient(config);
}
}
1、使用Hazelcast服务器配置访问缓存,参见HazelcastMapServerPocController。
可以通过get和getMap接口查看缓存中的数据。
这种模式等价于EHCache的经典用法,JVM内嵌缓存服务实现。
同时对于集群环境,支持backup-count和async-backup-count,以及read-backup-data等集群备份和加速访问机制。
2、缓存客户端访问缓存,参见HazelcastMapClientPocController。
访问hazelcast-map-client-poc-controller的get和put接口,可以操作缓存中的数据,由于这个client和上面服务器使用的一个缓存,因此访问的是同一数据内容。
这种模式等价于Redis等C/S模式的缓存访问实现,同时Hazelcast还支持near-cache本地加速缓存模式。
3、Spring缓存集成,参见SpringCachePocController。
访问spring-cache-poc-controller的getTime和clear接口,可以操作缓存中的数据。
由于对应缓存的配置在HazelcastServerBeansConfiguration中,是配置60秒超时。
@Bean(name = "springCachePoc01")
public Map<String, String> springCachePoc01() {
Config config = hazelcastInstance.getConfig();
// Spring Cache Poc
String name = "pocServer01.springCachePoc01";
config.getMapConfig(name)
.setTimeToLiveSeconds(60)
.setMaxIdleSeconds(60); // 1分钟
return hazelcastInstance.getMap(name);
}
因此在60秒内,重复访问getTime接口会返回相同的结果内容。
这在某些业务场景是非常有实用价值的,能够极大增强系统的抗压能力,例如机票查询结果。
4、****** Spring缓存集成,参见SpringCachePocController。
访问spring-cache-poc-controller的getTime和clear接口,可以操作缓存中的数据。
由于对应缓存的配置在HazelcastServerBeansConfiguration中,是配置60秒超时。
@Bean(name = "springCachePoc01")
public Map<String, String> springCachePoc01() {
Config config = hazelcastInstance.getConfig();
// Spring Cache Poc
String name = "pocServer01.springCachePoc01";
config.getMapConfig(name)
.setTimeToLiveSeconds(60)
.setMaxIdleSeconds(60); // 1分钟
return hazelcastInstance.getMap(name);
}
因此在60秒内,重复访问getTime接口会返回相同的结果内容。
这在某些业务场景是非常有实用价值的,能够极大增强系统的抗压能力,例如机票查询结果。
5、HIberante二级缓存,参见PersonController。
在Person的Model上设置了二级缓存之后,后续通过ID访问实体,会减少SQL的访问次数。
@Cache(region="model.Person", usage = CacheConcurrencyStrategy.READ_WRITE)
@Data
@Entity
public class Person implements Serializable {
private static final long serialVersionUID = 1964134334737487195L;
有关Hibernate的二级缓存,可以参考:
https://dzone.com/articles/pitfalls-hibernate-second-0
https://www.baeldung.com/hibernate-second-level-cache
https://github.com/hazelcast/hazelcast-hibernate5
https://github.com/hazelcast/hazelcast-hibernate/blob/master/README.md
缓存的级别有:
- NONE: 不使用二级缓存。
- READ_ONLY:数据库数据不会更改,简单高效。对于固定数据字典类数据可以使用。
- NONSTRICT_READ_WRITE: 数据事务提交后,会更新缓存,期间会出现数据不一致可能。对于变更很少,数据一致要求不高的,例如数据字典、基础参考类数据可以使用。
- READ_WRITE: 会在缓存级别上追加软锁,控制在事务变更数据的时候让缓存访问直接访问数据库。分布式情况缓存不一定能够完整保证,普通事务要求不是非常严格的读多写少的数据可以使用。Hazelcast支持集群情况的变更通知机制,一定程度能够保证集群中数据的一致性。
- TRANSACTIONAL:对分布式集群实施强事务一致性约束,目前Hazelcast不支持。
6、使用缓存IMap手动实现数据的缓存,参见EmployeeController和EmployeeService。
主要展示了再Hazelcast的缓存对象的不同属性上的查找方法,可以设置属性查找索引,通过缓存没有找到再到数据库中查找。
这种实现模式适合特别复杂的对象,需要根据属性进行多级查询的缓存场景,可以有效减少缓存Map的数量。
如果不涉及多属性多级缓存,只是基础的ID缓存的话,建议直接使用Spring的Cachable、CacheEvict、CachePut注解组直接在Repository或者Service方法上即可完成缓存对象的管理实现。
7、ID生成,参见IdGeneratorController。
Hazelcast提供了用时间、节点、序列在集群上提供全局唯一ID的实现,能够实现基本有序的序列,序列中会跳号。
相关提供了:
- generate: 生成全局Long的ID。伪顺序长整数唯一序列,常用业务ID使用
- generateCode: 根据全局Long的ID,生成伪乱序编码(目前为12字符),常用作业务代码、编码使用,已考虑HBase等数据分区。
- generateNonceToken: 根据随机函数生成一次性Token码(32字符),常用做Token。
8、MapStore的使用,参见DeviceStatusController。
Hazelcast提供了MapStore,可以实现缓存到DB(可以是任何持久化后端)的机制。
@Bean(name = "deviceStatusCacheMap")
public Map<String, DeviceStatus> deviceStatusCacheMap() {
Config config = hazelcastInstance.getConfig();
// MapStore Poc
MapConfig mapConfig = config.getMapConfig("pocServer01.deviceStatus")
.setTimeToLiveSeconds(120) // 测试用120,应该足够大,比如3600秒
.setMaxIdleSeconds(120)
.setEvictionPolicy(EvictionPolicy.LRU);// 1小时
final MapStoreConfig mapStoreConfig = mapConfig.getMapStoreConfig();
mapStoreConfig.setImplementation(deviceStatusMapStore)
.setEnabled(true)
.setWriteDelaySeconds(60)
.setWriteBatchSize(1000)
.setWriteCoalescing(true);
config.addMapConfig(mapConfig);
return hazelcastInstance.getMap("pocServer01.deviceStatus");
}
配置参数提供了延迟写,批量写,合并写,透传读的能力。
@Slf4j
@Component
public class DeviceStatusMapStore extends MapStoreAdapter<String, DeviceStatus> {
@Autowired
private DeviceStatusRepository deviceStatusRepository;
@Override
public DeviceStatus load(final String no) {
Assert.notNull(no, "No should not be null.");
Optional<DeviceStatus> optionalDeviceStatus = deviceStatusRepository.findByNo(no);
if(optionalDeviceStatus.isPresent()){
return optionalDeviceStatus.get();
}
return null;
}
@Override
public void delete(String no) {
Assert.notNull(no, "No should not be null.");
deviceStatusRepository.deleteByNo(no);
}
@Override
public void store(String no, DeviceStatus deviceStatus) {
Assert.notNull(no, "No should not be null.");
Assert.notNull(deviceStatus, "deviceStatus should not be null.");
DeviceStatus deviceStatusForSave = deviceStatusRepository.findByNo(no).orElse(deviceStatus);
deviceStatusForSave.setValue(deviceStatus.getValue());
deviceStatusForSave.setLastUpdated(deviceStatus.getLastUpdated());
deviceStatusRepository.save(deviceStatusForSave);
}
@Override
public void storeAll(final Map<String, DeviceStatus> map) {
final List<DeviceStatus> dataList = new ArrayList<>();
map.forEach((no, deviceStatus) -> {
DeviceStatus deviceStatusForSave = deviceStatusRepository.findByNo(no).orElse(deviceStatus);
deviceStatusForSave.setValue(deviceStatus.getValue());
deviceStatusForSave.setLastUpdated(deviceStatus.getLastUpdated());
dataList.add(deviceStatusForSave);
});
this.deviceStatusRepository.saveAll(dataList);
}
}
目前IoT的监控数据指标的部分,非常适合这个场景,实现会更加简单可控。
9、队列的使用,参见QueueTaskController。
Hazelcast提供了可持久化的队列实现,可以实现缓存到DB(可以是任何持久化后端)的机制。
目前IoT的任务下发工作场景比较适合这个机制。
本来想直接使用Queue的Key作为数据库Model的ID,这个目前不知道为什么在往队列添加数据的时候,保存到数据时底层会触发HIbernate的Query的Cache,需要把JPA中的参数【use_query_cache: false】才能好用,另外一个规避方案就是不使用Queue中提供的Key作为ID,使用数据中默认的ID(注: 已经更改成Key作为唯一列,ID为DB自动生成的模式,结果也是不行,问题应该是出在了Hazelcast在一个Thread中两次调用嵌套的问题。。。。)。
因此建议这个模式之前使用Hazelcast的Queue,需要【use_query_cache: false】才行。
这个在DeviceStatus使用MapStore的时候没有问题,估计应该是Queue对应的一个Bug吧。
10、调度的使用,参见EchoTaskSchduler。
@Slf4j
@Component
public class EchoTaskSchduler {
@Autowired
private EchoTask echoTask;
@Autowired
private IScheduledExecutorService scheduledExecutorService;
private IScheduledFuture<?> scheduledFuture;
@PostConstruct
public void init() {
scheduledFuture= scheduledExecutorService.scheduleAtFixedRate(
TaskUtils.named("echoTask", echoTask), 10, 30, TimeUnit.SECONDS);
log.info("EchoTask has been started.");
}
@PreDestroy
public void shutdown() {
scheduledFuture.cancel(true);
scheduledFuture.dispose();
log.info("EchoTask has been shutdown.");
}
}
调度固定间隔的任务,如果任务执行过长,下一次激活时发现任务还在执行,会自动跳过任务触发,保证任务不累积调度。
10、其他说明,以上所有测试可以配合使用http://localhost:5000/console 进行数据库确认,另外,日志中数据库SQL日志已经打开。