/delayqueue

golang implemented reliable delay message queue based on redis. Go 语言实现的基于 Redis 的延时消息队列,支持基于 ACK 机制的可靠投递

Primary LanguageGoApache License 2.0Apache-2.0

DelayQueue

license Build Status Coverage Status Go Report Card Go Reference

DelayQueue is a message queue supporting delayed/scheduled delivery based on redis.

DelayQueue support ACK/Retry mechanism, it will re-deliver message after a while as long as no confirmation is received. As long as Redis doesn't crash, consumer crashes won't cause message loss.

DelayQueue can works safely in a distributed environment, you could deliver message to same queue or consume message from same queue at multiple machines.

Install

DelayQueue requires a Go version with modules support. Run following command line in your project with go.mod:

go get github.com/hdt3213/delayqueue

Get Started

package main

import (
	"github.com/go-redis/redis/v8"
	"github.com/hdt3213/delayqueue"
	"strconv"
	"time"
)

func main() {
	redisCli := redis.NewClient(&redis.Options{
		Addr: "127.0.0.1:6379",
	})
	queue := delayqueue.NewQueue("example", redisCli, func(payload string) bool {
		// callback returns true to confirm successful consumption.
		// If callback returns false or not return within maxConsumeDuration, DelayQueue will re-deliver this message
		return true
	}).WithConcurrent(4) // set the number of concurrent consumers 
	// send delay message
	for i := 0; i < 10; i++ {
		err := queue.SendDelayMsg(strconv.Itoa(i), time.Hour, delayqueue.WithRetryCount(3))
		if err != nil {
			panic(err)
		}
	}
	// send schedule message
	for i := 0; i < 10; i++ {
		err := queue.SendScheduleMsg(strconv.Itoa(i), time.Now().Add(time.Hour))
		if err != nil {
			panic(err)
		}
	}
	// start consume
	done := queue.StartConsume()
	<-done
}

Options

WithLogger(logger *log.Logger)

WithLogger customizes logger for queue

WithConcurrent(c uint) 

WithConcurrent sets the number of concurrent consumers

WithFetchInterval(d time.Duration)

WithFetchInterval customizes the interval at which consumer fetch message from redis

WithMaxConsumeDuration(d time.Duration)

WithMaxConsumeDuration customizes max consume duration

If no acknowledge received within WithMaxConsumeDuration after message delivery, DelayQueue will try to deliver this message again

WithFetchLimit(limit uint)

WithFetchLimit limits the max number of unack (processing) messages

UseHashTagKey()

UseHashTagKey add hashtags to redis keys to ensure all keys of this queue are allocated in the same hash slot.

If you are using Codis/AliyunRedisCluster/TencentCloudRedisCluster, you should add this option to NewQueue: NewQueue("test", redisCli, cb, UseHashTagKey()). This Option cannot be changed after DelayQueue has been created.

WARNING! CHANGING(add or remove) this option will cause DelayQueue failing to read existed data in redis

see more: https://redis.io/docs/reference/cluster-spec/#hash-tags

WithDefaultRetryCount(count uint)

WithDefaultRetryCount customizes the max number of retry, it effects of messages in this queue

use WithRetryCount during DelayQueue.SendScheduleMsg or DelayQueue.SendDelayMsg to specific retry count of particular message

More Details

Here is the complete flowchart:

  • pending: A sorted set of messages pending for delivery. member is message id, score is delivery unix timestamp.
  • ready: A list of messages ready to deliver. Workers fetch messages from here.
  • unack: A sorted set of messages waiting for ack (successfully consumed confirmation) which means the messages here is being processing. member is message id, score is the unix timestamp of processing deadline.
  • retry: A list of messages which processing exceeded deadline and waits for retry
  • garbage: A list of messages reaching max retry count and waits for cleaning