/mybatis-redis

基于redis的mybatis二级缓存插件

Primary LanguageJava

###基于redis的mybatis二级缓存插件


####简介 使用redis作为mybatis的缓存介质,通过使用mybatis的拦截器,在SQL执行的过程中,检查各个Mapper的二级缓存中过期的部分,匹配成功后删除过期的缓存,保证缓存的实时正确性。

####使用

#####添加redis的配置文件 下面是一份properties文件,将它放置在src包下即可,我的FileUtile类就可以找到。这份是一个包含整个所可以配置的东西,根据自己项目的需求抽取对应的配置整理成properties文件即可。

#redis的基础配置
redis.base.ip=127.0.0.1
redis.base.port=6379
redis.base.auth=xujianguo
#redis的对象池配置
#可用连接实例的最大数目,默认值为8
redis.pool.maxActive=1024
#控制一个pool最多有多少个状态为idle(空闲的)的jedis实例,默认值也是8
redis.pool.maxIdle=200
#等待可用连接的最大时间,单位毫秒,默认值为-1,表示永不超时。如果超过等待时间,则直接抛出JedisConnectionException
redis.pool.maxWait=10000
#连接超时时间
redis.pool.timeout=10000
#是否开启shard分布式模式
redis.shard.enable=false
#主机one的配置
redis.shard.one.ip=127.0.0.1
redis.shard.one.port=6379
redis.shard.one.auth=xujianguo
#主机two的配置
redis.shard.two.ip=127.0.0.1
redis.shard.two.port=6378
redis.shard.two.auth=xujianguo
#线程数控制
redis.limiter.limit=100000

#####二级缓存的关联配置 解决Mapper之间二级缓存的过期文件,就是通过一份关联配置来实现的,observers配置的是一个观察者,观察着statement的变化,statement的缓存更新了,就通过观察者,观察者立刻更新缓存,从而保证获取到是最新的数据。

<?xml version="1.0" encoding="UTF-8"?>
<dependencies>
   <statements>
       <statement id="com.xujianguo.dao.PersonDao.findById">
          <observer id="com.xujianguo.dao.UserDao.findOne"/>
       </statement>
   </statements>
</dependencies>

这个是上面xml的dtd文件,大家可以根据这个进行配置xml元素

<?xml version="1.0" encoding="UTF-8" ?>
<!ELEMENT dependencies (statements?)>
<!ELEMENT statements (statement*)>
<!ELEMENT statement (observer+)>
<!ELEMENT statement EMPTY>
<!ATTLIST statement id CDATA #REQUIRED>
<!ELEMENT observer EMPTY>
<!ATTLIST observer id CDATA #REQUIRED>

#####使用指定二级缓存 首先在mybatis的核心xml中配置好下面的全局设置,目的是开启mybatis的二级缓存

	<!-- 指定Mybatis使用log4j -->
	<settings>
		<setting name="cacheEnabled" value="true"/>
	</settings>

在mapper中配置二级缓存,执行我们的RedisCache作为缓存的实现类

<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper
PUBLIC "-//ibatis.apache.org//DTD Mapper 3.0//EN"
"http://ibatis.apache.org/dtd/ibatis-3-mapper.dtd">
<mapper namespace="com.xujianguo.dao.PersonDao">
	<cache eviction="LRU" type="com.xujianguo.cache.RedisCache"/>

	<select id="findById" parameterType="int" resultMap="PersonMap" useCache="true">
		select * from person where id=#{id}
	</select>
</mapper>

这样就可以实现我们的最终目的了。

####设计及实现

#####拦截器中实现缓存更新 我们在CacheHandlerIntercept这个核心拦截器中拦截了四类method,query/update/commit/rollback,分别对应查询/更新/提交/回滚四类操作,下面这幅图展示了四类操作是如何对缓存进行更新的。 mybatis-redis-1

#####基于Redis的二级缓存实现 **RedisCache:**实现了Cache接口,提供基本的获取缓存/移除缓存等方法,内置了一个RedisPool的连接池,用于获取跟Redis进行通讯的连接。

public class RedisCache implements Cache {
	private static Logger log = Logger.getLogger(RedisCache.class);
	//id属性只是一个名字标识,就像PerpetualCache里面的id被BaseExecutor设置为LocalCache
	private String id;
	//读写锁,这个必须提供,Executor调用Cache的时候就加锁
	private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
	//对象池
	private RedisPool pool;

    /* others */
}

**RedisPool:**里面有Config和Limiter两个重要的类,Config主要是去加载properties文件去获取对应的连接池配置和其他重要配置,Limiter是一个线程控制器,防止高并发下机器瘫痪。其他就是获取连接和释放连接的基本方法了。

public class RedisPool {
	private static Logger log = Logger.getLogger(RedisPool.class);
	//单例模式
	private static RedisPool pool;
	//真实使用的池
	private Pool realPool;
	//线程控制器
	private Limiter limiter;
	//配置
	private Config config;

    /* others */
}

**Config:**首先遍历搜索properties文件,找到后就通过反射加载配置文件,原理是通过反射类的属性,然后去正则匹配对应的properties的key,接着反射创建类的实例,设置对应的value,这个是核心**。

public class Config {
	//基本配置
	private RedisBase base;
	//对象池的配置
	private RedisPool pool;
	//分片的配置
	private RedisShard shard;
	//线程控制参数
	private LimiterParam limiterParam;

	/**
	 * 加载对应的配置到对应的类中
	 */
	public Config() {
		PropertyParser parser = PropertyParser.getInstance();
		Properties pBase = parser.doFilter(new RegexFilter("^redis\\.base\\..*"));
		Properties pPool = parser.doFilter(new RegexFilter("^redis\\.pool\\..*"));
		Properties pShard = parser.doFilter(new RegexFilter("^redis\\.shard\\..*"));
		Properties pLimiter = parser.doFilter(new RegexFilter("^redis\\.limiter\\..*"));
		base = PropertyReflect.reflect(RedisBase.class, pBase);
		pool = PropertyReflect.reflect(RedisPool.class, pPool);
		shard = PropertyReflect.reflect(RedisShard.class, pShard);
		limiterParam = PropertyReflect.reflect(LimiterParam.class, pLimiter);
	}

    /* others */
}

**Limiter:**使用AQS框架的共享模式进行同步,设置一个limit值,达到超过limit值的时候进行阻塞,释放锁的时候limit值减1.

public class Limiter {
	//日志记录
	private static final Logger log = Logger.getLogger(Limiter.class);
	
	//同步器
	private final Sync sync;
	//内部计数器
	private final AtomicLong count;
	//限制值
	private volatile long limit;
	
	//初始化
	public Limiter(long limit) {
		this.limit = limit;
		count = new AtomicLong(0);
		sync = new Sync();
	}
	
	/**
	 * 使用AQS框架的共享模式进行同步
	 * @author xujianguo
	 * @email ray_xujianguo@yeah.net
	 * @time 2015年3月31日
	 */
	@SuppressWarnings("serial")
	private class Sync extends AbstractQueuedSynchronizer {
		public Sync() {}
		
		/**
		 * 超过limit值进行阻塞
		 */
		@Override
		protected int tryAcquireShared(int arg) {
			long newValue = count.incrementAndGet();
			if(newValue > limit) {
				count.decrementAndGet();
				return -1;
			}
			return 1;
		}
		
		/**
		 * 减少limit
		 */
		@Override
		protected boolean tryReleaseShared(int arg) {
			count.decrementAndGet();
			return true;
		}
	}
	
	/**
	 * 限制
	 * @throws InterruptedException
	 */
	public void limit() throws InterruptedException {
		sync.acquireSharedInterruptibly(0);
	}
	
	/**
	 * 解除限制
	 * @return
	 */
	public boolean unlimit() {
		return sync.releaseShared(0);
	}
	
	/**
	 * 重置计数器
	 */
	public void reset() {
		count.set(0);
	}
}