Browse Source

redis queue增加幂等处理

marion 4 years ago
parent
commit
4c7a1a3985
2 changed files with 7 additions and 3 deletions
  1. 5 2
      utils/redis/redis_queue.go
  2. 2 1
      utils/redis/redis_queue_test.go

+ 5 - 2
utils/redis/redis_queue.go

@@ -72,7 +72,7 @@ func (q *Queue) Clean() {
 // Pop() 从Redis列表中读取数据
 // 为避免消费者崩溃或意外错误丢数据,被读取(主列表队尾)的数据同时插入(备份列表队头)备份列表,当成功处理完成时,再从备份列表移除
 // 备份列表和主列表可以是同一个,即构造一个环形队列,但注意在环形队列时,消费异常时的阻塞行为是有差异的,且数据积压少并有多个消费者时会触发重复处理
-func (q *Queue) Pop(fn func(interface{})) {
+func (q *Queue) Pop(fn func(interface{}) error) {
 	for {
 		reply, err := q.hub.Do("BRPOPLPUSH", q.MainListName, q.BackupListName, q.BlockedTimeout)
 		if nil != err {
@@ -83,7 +83,10 @@ func (q *Queue) Pop(fn func(interface{})) {
 			continue
 		}
 		if nil != fn {
-			fn(reply)
+			err = fn(reply)
+			if nil != err { // 传人的逻辑处理函数返回错误时,不继续执行队列数据清理
+				continue
+			}
 		}
 		// 环形队列时,为避免忽略掉新增的的相同内容更新事件,仅移除最新的一条与已处理值相等的内容
 		// 当无新增的相同内容时,即刚刚RPOPLPUSH从队尾"备份"到队头的

+ 2 - 1
utils/redis/redis_queue_test.go

@@ -23,9 +23,10 @@ func TestRedisQueue(t *testing.T) {
 	// 消费
 	for i := 0; i < 4; i++ {
 		go func(idx int, q *Queue) {
-			q.Pop(func(reply interface{}) {
+			q.Pop(func(reply interface{}) error {
 				content, _ := redis.String(reply, nil)
 				fmt.Printf("Receiver %d get:%+v\n", idx, content)
+				return nil
 			})
 		}(i, q)
 	}