基于Golang实现的延时队列
延时队列是指在指定的时间点进行消息消费,具体消费逻辑由用户自己来实现。
传统解决方法一般采用cron来实现,但有以下缺点:
- 轮训效率太低,每次都需要扫库。
- 如果扫库频率太高,则后端数据库压力过大,如果频率太低,则存在有效性时间差较大的问题
- 用户网上购买时后,如果收货后,15天内未对交易进行评论,则系统进行默认5星评论。
- 订单超过30分钟未支付,则系统进行自动取消
主要使用到的两个数据结构是 环形队列 和 集合。其中环形队列是由数组来实现。
currentSlot 表示当前操作的环位置,这里是数组的索引值
timer 定时器,默认每秒移动一个slot
系统主要由三部分组成,分别为slot、Elements和Element。
Slots 代表一个环, 由多个slot组成,每个slot对应一个Elements
Elements slot对应的值
Element 组成Elements集合的元素
每个环节点slot就是一个数据集合 Elements,这个集合内的数据则表示当前时间点需要进行消费的信息集合,有可能是下次循环到这个节点的时间进行消费。
环与集合的关系
slots[0] = Elements
slots[1] = Elements
slots[...] = ...
一个Elements是由一个或多个 Element 元素组成,每个 Element 元素都有一个 cycleNum 字段,用来表示此元素是立即消费还是以后消费,其值也可以理解成环的循环周期。 如果cycleNum字段值为0,则表示立即消费,如果cycleNum=2则表示还需要两个环周期才能消费,每次循环都进行 cycleNum-- 操作,直到为0时结束。
集合与元素的关系
Elements = {Element、Element、Element}
所以整个延时队列看起来是这个样子:
slots[0] = []*Elements{*Element{}, *Element{}, *Element{}...}
slots[1] = []*Elements{*Element{}, *Element{}, *Element{}...}
slots[2] = []*Elements{*Element{}, *Element{}, *Element{}...}
...
系统会有一个定时器timer,每1秒(可通过delayqueue.WithFrequency 函数调整)会移动一个slot, 此时currentSlot的值加1,表示下一个节点位置。
然后遍历当前环点中的所有元素,如果当前元素生命周期cycleNum=0,则立即消费,否则将cycleNum--, 直到循环完集合中的所有元素。
同时每次添加新元素时,都要以当前时间所在的slot位置为起点,假如当前时间为 00:05:10, 在第 310 (560+10) 个slot, 这时添加一个元素时间为 00:02:50,
由于每秒移动一个slot, 而新添加元素时间slot为179(260+50), 则将这个元素放在当前位置后往数的第179个slot, 即这个环的第 310+179=489个slot中。
如果添加的时间大于当前时间的多个环周期时,只需要将环周期对应的slot个数减去即可,环的周期数使用 cycleNum 值来表示。
package main
import (
"fmt"
"time"
"github.com/cfanbo/delayqueue"
)
func consume(entry delayqueue.Entry) {
fmt.Println("当前:", time.Now().Format("2006-01-02 15:04:05"))
fmt.Println("消费:", entry.ConsumeTime().Format("2006-01-02 15:04:05"))
fmt.Println("消费内容", entry.Body())
fmt.Println("=======================")
}
func main() {
q := delayqueue.New()
q.Put(time.Now().Add(time.Second*2), "2秒后")
q.Put(time.Now().Add(time.Second*15), "15秒后")
q.Put(time.Now().Add(time.Second*8), "8秒后")
q.Put(time.Now().Add(time.Second*43), "43秒后")
q.Put(time.Now().Add(time.Second*50), "50秒后")
q.Put(time.Now().Add(time.Second*28), "28秒后")
q.Run(consume)
}
支持用户自定义间隔时间,如每分钟,每小时,只要是time.NewTicker()支持的 time.Duration 类型即可。
调用方法如下:
// 在New() 函数里调用 WithFrequency() 函数即可
q := delayqueue.New(delayqueue.WithFrequency(time.Minute))
q.Put(time.Now().Add(time.Minute * 2), "2分钟后消费此内容")
- 系统支持频率周期类型是time.Duration, 如 time.Second、time.Minute 和 time.Hour。
- 队列暂不支持数据持久化,所以若停止服务或者退出重启,则队列数据将全部丢失。
建议在数据消费后对其状态进行变更存储,以便在下次服务启动成功后,立即将需要处理的数据写入延时队列。