package redis import ( "git.aionnect.com/aionnect/go-common/utils" "git.aionnect.com/aionnect/go-common/utils/logger" "os" "os/signal" "strings" "syscall" "time" ) // Redis队列实现 // 注意! Redis Cluster不支持同时操作多个健的命令,例如BRPOPLPUSH",故本类与Redis Cluster不兼容 type Queue struct { MainListName string // 主列表名称 BackupListName string // 备份列表名称 BlockedTimeout int // 阻塞超时 RecycleTimeout time.Duration // 回收超时 IsIdempotent bool // 是否幂等 recycleTicker *time.Ticker // 回收定时器 isRing bool // 是否环形队列 hub *Hub // Redis连接适配器对象 l *logger.Logger // 日志记录器实例 } // 构造Redis队列新实例 func NewRedisQueue(mainListName, backupListName string, blockedTimeout int, recycleTimeout time.Duration) *Queue { mainListName = strings.TrimSpace(mainListName) backupListName = strings.TrimSpace(backupListName) if mainListName == "" { mainListName = "defaultList" } if backupListName == "" { backupListName = mainListName } if blockedTimeout <= 0 { blockedTimeout = 500 } if recycleTimeout <= 0 { recycleTimeout = 5 * time.Second } q := &Queue{ hub: NewHub(), MainListName: mainListName, BackupListName: backupListName, BlockedTimeout: blockedTimeout, RecycleTimeout: recycleTimeout, } q.l = q.hub.LOG() q.isRing = q.MainListName == q.BackupListName // 自动关闭处理 q.closeWait() return q } // Clean() 清理Redis列表,通常情况应该用不上 func (q *Queue) Clean() { _, err := q.hub.Do("DEL", q.MainListName) if nil != err { q.l.Errorf("Clean() delete %s failed: %s", q.MainListName, err.Error()) } if !q.isRing { _, err = q.hub.Do("DEL", q.BackupListName) if nil != err { q.l.Errorf("Clean() delete %s failed: %s", q.BackupListName, err.Error()) } } } // Pop() 从Redis列表中读取数据 // 为避免消费者崩溃或意外错误丢数据,被读取(主列表队尾)的数据同时插入(备份列表队头)备份列表,当成功处理完成时,再从备份列表移除 // 备份列表和主列表可以是同一个,即构造一个环形队列,但注意在环形队列时,消费异常时的阻塞行为是有差异的,且数据积压少并有多个消费者时会触发重复处理 func (q *Queue) Pop(fn func(interface{}) error) { for { reply, err := q.hub.Do("BRPOPLPUSH", q.MainListName, q.BackupListName, q.BlockedTimeout) if nil != err { q.l.Errorf("Pop() pop from %s and push to %s failed: %s", q.MainListName, q.BackupListName, err.Error()) if err == ErrRedisConnNil { // Redis未连接时无阻塞,等待几秒再重试,以免死循环 time.Sleep(5 * time.Second) } continue } if nil != fn { err = fn(reply) if nil != err { // 传人的逻辑处理函数返回错误时,不继续执行队列数据清理 continue } } // 环形队列时,为避免忽略掉新增的的相同内容更新事件,仅移除最新的一条与已处理值相等的内容 // 当无新增的相同内容时,即刚刚RPOPLPUSH从队尾"备份"到队头的 // 当有新增的相同内容时,虽然最新增加的一条会被移除,但队列中还是会有更早的"备份"记录 pipeline := q.hub.Pipeline() if q.isRing { if q.IsIdempotent { // 当消息体是幂等时,即反复操作得到的结果一致时,当成功操作一次后,也即可清除主队列中相同元素,下同 pipeline.Send("LREM", q.BackupListName, 0, reply) // isRing时BackupListName = MainListName } else { pipeline.Send("LREM", q.BackupListName, 1, reply) } } else { // 非环形队列时,移除备份列表中所有与已处理值相等的内容 pipeline.Send("LREM", q.BackupListName, 0, reply) if q.IsIdempotent { pipeline.Send("LREM", q.MainListName, 0, reply) } } _, err = pipeline.Execute() if nil != err { q.l.Errorf("Pop() remove from %s failed: %s", q.BackupListName, err.Error()) } } } // Push() 添加数据到Redis列表 func (q *Queue) Push(obj interface{}) { _, err := q.hub.Do("LPUSH", q.MainListName, obj) // 主列表队头插入 if nil != err { q.l.Errorf("Push() push to %s failed: %s", q.MainListName, err.Error()) } } // recycle() 定时将备份队列中最早的一条内容,回收到主队列重试处理 // 没有写到构造函数里,是为了可以灵活分离和分配生产者、消费者、回收者 func (q *Queue) Recycle() { if q.isRing { // 环形队列不需要做回收处理 return } go func(q *Queue) { defer utils.DefaultGoroutineRecover(nil, `Redis队列回收`) q.recycleTicker = time.NewTicker(q.RecycleTimeout) for { select { case <-q.recycleTicker.C: _, err := q.hub.Do("RPOPLPUSH", q.BackupListName, q.MainListName) if nil != err { q.l.Errorf("recycle() pop from %s and push to %s failed: %s", q.BackupListName, q.MainListName, err.Error()) } } } }(q) } // 收到系统退出消息时自动关闭Redis队列相关资源 func (q *Queue) closeWait() { go func(q *Queue) { defer utils.DefaultGoroutineRecover(nil, `Redis队列关闭`) var c chan os.Signal var s os.Signal c = make(chan os.Signal, 1) signal.Notify(c, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGKILL) for { s = <-c switch s { case syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGKILL: if !q.isRing && nil != q.recycleTicker { q.recycleTicker.Stop() } err := q.hub.Close() if nil != err { q.l.Errorf("closeWait() close redis queue connection failed %s", err.Error()) } return default: return } } }(q) }