Browse Source

redis queue增加幂等处理

marion 4 years ago
parent
commit
6fa266b2dd
3 changed files with 19 additions and 2 deletions
  1. 5 0
      utils/redis/redis_conn.go
  2. 13 2
      utils/redis/redis_queue.go
  3. 1 0
      utils/redis/redis_queue_test.go

+ 5 - 0
utils/redis/redis_conn.go

@@ -143,3 +143,8 @@ func (h *Hub) Do(commandName string, args ...interface{}) (interface{}, error) {
 
 	return h.adapter.Do(commandName, args...)
 }
+
+// 返回命令管道操作对象
+func (h *Hub) Pipeline() IRedisPipeline {
+	return h.adapter.Pipeline()
+}

+ 13 - 2
utils/redis/redis_queue.go

@@ -17,6 +17,7 @@ type Queue struct {
 	BackupListName string         // 备份列表名称
 	BlockedTimeout int            // 阻塞超时
 	RecycleTimeout time.Duration  // 回收超时
+	IsIdempotent   bool           // 是否幂等
 	recycleTicker  *time.Ticker   // 回收定时器
 	isRing         bool           // 是否环形队列
 	hub            *Hub           // Redis连接适配器对象
@@ -87,11 +88,21 @@ func (q *Queue) Pop(fn func(interface{})) {
 		// 环形队列时,为避免忽略掉新增的的相同内容更新事件,仅移除最新的一条与已处理值相等的内容
 		// 当无新增的相同内容时,即刚刚RPOPLPUSH从队尾"备份"到队头的
 		// 当有新增的相同内容时,虽然最新增加的一条会被移除,但队列中还是会有更早的"备份"记录
+		pipeline := q.hub.Pipeline()
 		if q.isRing {
-			_, err = q.hub.Do("LREM", q.BackupListName, 1, reply)
+			if q.IsIdempotent { // 当消息体是幂等时,即反复操作得到的结果一致时,当成功操作一次后,也即可清除主队列中相同元素,下同
+				pipeline.Send("LREM", q.BackupListName, 0, reply) // isRing时BackupListName = MainListName
+			} else {
+				pipeline.Send("LREM", q.BackupListName, 1, reply)
+			}
 		} else { // 非环形队列时,移除备份列表中所有与已处理值相等的内容
-			_, err = q.hub.Do("LREM", q.BackupListName, 0, reply)
+			pipeline.Send("LREM", q.BackupListName, 0, reply)
+			if q.IsIdempotent {
+				pipeline.Send("LREM", q.MainListName, 0, reply)
+			}
 		}
+		_, err = pipeline.Execute()
+
 		if nil != err {
 			q.l.Errorf("Pop() remove from %s failed: %s", q.BackupListName, err.Error())
 		}

+ 1 - 0
utils/redis/redis_queue_test.go

@@ -16,6 +16,7 @@ func TestRedisQueue(t *testing.T) {
 
 	// 初始化
 	q := NewRedisQueue("myList", "backList", 3000, 5*time.Second)
+	q.IsIdempotent = true
 	q.Clean()
 	q.Recycle()