redis_queue.go 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157
  1. package redis
  2. import (
  3. "git.aionnect.com/aionnect/go-common/utils"
  4. "git.aionnect.com/aionnect/go-common/utils/logger"
  5. "os"
  6. "os/signal"
  7. "strings"
  8. "syscall"
  9. "time"
  10. )
  11. // Redis队列实现
  12. // 注意! Redis Cluster不支持同时操作多个健的命令,例如BRPOPLPUSH",故本类与Redis Cluster不兼容
  13. type Queue struct {
  14. MainListName string // 主列表名称
  15. BackupListName string // 备份列表名称
  16. BlockedTimeout int // 阻塞超时
  17. RecycleTimeout time.Duration // 回收超时
  18. recycleTicker *time.Ticker // 回收定时器
  19. isRing bool // 是否环形队列
  20. hub *Hub // Redis连接适配器对象
  21. l *logger.Logger // 日志记录器实例
  22. }
  23. // 构造Redis队列新实例
  24. func NewRedisQueue(mainListName, backupListName string, blockedTimeout int, recycleTimeout time.Duration) *Queue {
  25. mainListName = strings.TrimSpace(mainListName)
  26. backupListName = strings.TrimSpace(backupListName)
  27. if mainListName == "" {
  28. mainListName = "defaultList"
  29. }
  30. if backupListName == "" {
  31. backupListName = mainListName
  32. }
  33. if blockedTimeout <= 0 {
  34. blockedTimeout = 500
  35. }
  36. if recycleTimeout <= 0 {
  37. recycleTimeout = 5 * time.Second
  38. }
  39. q := &Queue{
  40. hub: NewHub(),
  41. MainListName: mainListName,
  42. BackupListName: backupListName,
  43. BlockedTimeout: blockedTimeout,
  44. RecycleTimeout: recycleTimeout,
  45. }
  46. q.l = q.hub.LOG()
  47. q.isRing = q.MainListName == q.BackupListName
  48. // 自动关闭处理
  49. q.closeWait()
  50. return q
  51. }
  52. // Clean() 清理Redis列表,通常情况应该用不上
  53. func (q *Queue) Clean() {
  54. _, err := q.hub.Do("DEL", q.MainListName)
  55. if nil != err {
  56. q.l.Errorf("Clean() delete %s failed: %s", q.MainListName, err.Error())
  57. }
  58. if !q.isRing {
  59. _, err = q.hub.Do("DEL", q.BackupListName)
  60. if nil != err {
  61. q.l.Errorf("Clean() delete %s failed: %s", q.BackupListName, err.Error())
  62. }
  63. }
  64. }
  65. // Pop() 从Redis列表中读取数据
  66. // 为避免消费者崩溃或意外错误丢数据,被读取(主列表队尾)的数据同时插入(备份列表队头)备份列表,当成功处理完成时,再从备份列表移除
  67. // 备份列表和主列表可以是同一个,即构造一个环形队列,但注意在环形队列时,消费异常时的阻塞行为是有差异的,且数据积压少并有多个消费者时会触发重复处理
  68. func (q *Queue) Pop(fn func(interface{})) {
  69. for {
  70. reply, err := q.hub.Do("BRPOPLPUSH", q.MainListName, q.BackupListName, q.BlockedTimeout)
  71. if nil != err {
  72. q.l.Errorf("Pop() pop from %s and push to %s failed: %s", q.MainListName, q.BackupListName, err.Error())
  73. if err == ErrRedisConnNil { // Redis未连接时无阻塞,等待几秒再重试,以免死循环
  74. time.Sleep(5 * time.Second)
  75. }
  76. continue
  77. }
  78. if nil != fn {
  79. fn(reply)
  80. }
  81. // 环形队列时,为避免忽略掉新增的的相同内容更新事件,仅移除最新的一条与已处理值相等的内容
  82. // 当无新增的相同内容时,即刚刚RPOPLPUSH从队尾"备份"到队头的
  83. // 当有新增的相同内容时,虽然最新增加的一条会被移除,但队列中还是会有更早的"备份"记录
  84. if q.isRing {
  85. _, err = q.hub.Do("LREM", q.BackupListName, 1, reply)
  86. } else { // 非环形队列时,移除备份列表中所有与已处理值相等的内容
  87. _, err = q.hub.Do("LREM", q.BackupListName, 0, reply)
  88. }
  89. if nil != err {
  90. q.l.Errorf("Pop() remove from %s failed: %s", q.BackupListName, err.Error())
  91. }
  92. }
  93. }
  94. // Push() 添加数据到Redis列表
  95. func (q *Queue) Push(obj interface{}) {
  96. _, err := q.hub.Do("LPUSH", q.MainListName, obj) // 主列表队头插入
  97. if nil != err {
  98. q.l.Errorf("Push() push to %s failed: %s", q.MainListName, err.Error())
  99. }
  100. }
  101. // recycle() 定时将备份队列中最早的一条内容,回收到主队列重试处理
  102. // 没有写到构造函数里,是为了可以灵活分离和分配生产者、消费者、回收者
  103. func (q *Queue) Recycle() {
  104. if q.isRing { // 环形队列不需要做回收处理
  105. return
  106. }
  107. go func(q *Queue) {
  108. defer utils.DefaultGoroutineRecover(nil, `Redis队列回收`)
  109. q.recycleTicker = time.NewTicker(q.RecycleTimeout)
  110. for {
  111. select {
  112. case <-q.recycleTicker.C:
  113. _, err := q.hub.Do("RPOPLPUSH", q.BackupListName, q.MainListName)
  114. if nil != err {
  115. q.l.Errorf("recycle() pop from %s and push to %s failed: %s", q.BackupListName, q.MainListName, err.Error())
  116. }
  117. }
  118. }
  119. }(q)
  120. }
  121. // 收到系统退出消息时自动关闭Redis队列相关资源
  122. func (q *Queue) closeWait() {
  123. go func(q *Queue) {
  124. defer utils.DefaultGoroutineRecover(nil, `Redis队列关闭`)
  125. var c chan os.Signal
  126. var s os.Signal
  127. c = make(chan os.Signal, 1)
  128. signal.Notify(c, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGKILL)
  129. for {
  130. s = <-c
  131. switch s {
  132. case syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGKILL:
  133. if !q.isRing && nil != q.recycleTicker {
  134. q.recycleTicker.Stop()
  135. }
  136. err := q.hub.Close()
  137. if nil != err {
  138. q.l.Errorf("closeWait() close redis queue connection failed %s", err.Error())
  139. }
  140. return
  141. default:
  142. return
  143. }
  144. }
  145. }(q)
  146. }