Browse Source

redis queue

marion 4 years ago
parent
commit
9e4068ef14

+ 34 - 0
utils/redis/redis_cluster_pool_test.go

@@ -0,0 +1,34 @@
+package redis
+
+import (
+	rd "github.com/gomodule/redigo/redis"
+	"github.com/spf13/viper"
+	"testing"
+)
+
+func TestRedisClusterPool(t *testing.T) {
+	viper.SetDefault("redis.host", "127.0.0.1:6379")
+	//viper.SetDefault("redis.host", "119.29.80.118:6379")
+	//viper.SetDefault("redis.nodes", []string{"10.10.10.98:6379", "10.10.10.68:6379", "10.10.10.95:6379"})
+	hub := NewHub()
+	for i := 0; i < 500; i++ {
+		_, err := hub.Do("LPUSH", "test_key", []byte("test"))
+		if err != nil {
+			println("push error:", err.Error())
+		}
+	}
+
+	for i := 0; i < 100; i++ {
+		go func() {
+			data, err := rd.Bytes(hub.Do("RPOP", "test_key"))
+			if err != nil {
+				println("pop error:", err.Error())
+			} else {
+				println("pop received:", string(data))
+			}
+		}()
+	}
+
+	quit := make(chan bool)
+	<-quit
+}

+ 1 - 1
utils/redis/redis_conn.go

@@ -52,7 +52,7 @@ type Hub struct {
 func NewHub() *Hub {
 	viper.SetDefault("redis.max_idle", 300)
 	viper.SetDefault("redis.max_active", 1000)
-	viper.SetDefault("redis.timeout", time.Duration(60000))
+	viper.SetDefault("redis.timeout", 5*time.Second)
 	return &Hub{
 		LOG: func() *logger.Logger {
 			return logger.New()

+ 157 - 0
utils/redis/redis_queue.go

@@ -0,0 +1,157 @@
+package redis
+
+import (
+	"git.aionnect.com/aionnect/go-common/utils"
+	"git.aionnect.com/aionnect/go-common/utils/logger"
+	"os"
+	"os/signal"
+	"strings"
+	"syscall"
+	"time"
+)
+
+// Redis队列实现
+// 注意! Redis Cluster不支持同时操作多个健的命令,例如BRPOPLPUSH",故本类与Redis Cluster不兼容
+type Queue struct {
+	MainListName   string         // 主列表名称
+	BackupListName string         // 备份列表名称
+	BlockedTimeout int            // 阻塞超时
+	RecycleTimeout time.Duration  // 回收超时
+	recycleTicker  *time.Ticker   // 回收定时器
+	isRing         bool           // 是否环形队列
+	hub            *Hub           // Redis连接适配器对象
+	l              *logger.Logger // 日志记录器实例
+}
+
+// 构造Redis队列新实例
+func NewRedisQueue(mainListName, backupListName string, blockedTimeout int, recycleTimeout time.Duration) *Queue {
+	mainListName = strings.TrimSpace(mainListName)
+	backupListName = strings.TrimSpace(backupListName)
+	if mainListName == "" {
+		mainListName = "defaultList"
+	}
+	if backupListName == "" {
+		backupListName = mainListName
+	}
+	if blockedTimeout <= 0 {
+		blockedTimeout = 500
+	}
+	if recycleTimeout <= 0 {
+		recycleTimeout = 5 * time.Second
+	}
+	q := &Queue{
+		hub:            NewHub(),
+		MainListName:   mainListName,
+		BackupListName: backupListName,
+		BlockedTimeout: blockedTimeout,
+		RecycleTimeout: recycleTimeout,
+	}
+	q.l = q.hub.LOG()
+	q.isRing = q.MainListName == q.BackupListName
+
+	// 自动关闭处理
+	q.closeWait()
+	return q
+}
+
+// Clean() 清理Redis列表,通常情况应该用不上
+func (q *Queue) Clean() {
+	_, err := q.hub.Do("DEL", q.MainListName)
+	if nil != err {
+		q.l.Errorf("Clean() delete %s failed: %s", q.MainListName, err.Error())
+	}
+	if !q.isRing {
+		_, err = q.hub.Do("DEL", q.BackupListName)
+		if nil != err {
+			q.l.Errorf("Clean() delete %s failed: %s", q.BackupListName, err.Error())
+		}
+	}
+}
+
+// Pop() 从Redis列表中读取数据
+// 为避免消费者崩溃或意外错误丢数据,被读取(主列表队尾)的数据同时插入(备份列表队头)备份列表,当成功处理完成时,再从备份列表移除
+// 备份列表和主列表可以是同一个,即构造一个环形队列,但注意在环形队列时,消费异常时的阻塞行为是有差异的,且数据积压少并有多个消费者时会触发重复处理
+func (q *Queue) Pop(fn func(interface{})) {
+	for {
+		reply, err := q.hub.Do("BRPOPLPUSH", q.MainListName, q.BackupListName, q.BlockedTimeout)
+		if nil != err {
+			q.l.Errorf("Pop() pop from %s and push to %s failed: %s", q.MainListName, q.BackupListName, err.Error())
+			if err == ErrRedisConnNil { // Redis未连接时无阻塞,等待几秒再重试,以免死循环
+				time.Sleep(5 * time.Second)
+			}
+			continue
+		}
+		if nil != fn {
+			fn(reply)
+		}
+		// 环形队列时,为避免忽略掉新增的的相同内容更新事件,仅移除最新的一条与已处理值相等的内容
+		// 当无新增的相同内容时,即刚刚RPOPLPUSH从队尾"备份"到队头的
+		// 当有新增的相同内容时,虽然最新增加的一条会被移除,但队列中还是会有更早的"备份"记录
+		if q.isRing {
+			_, err = q.hub.Do("LREM", q.BackupListName, 1, reply)
+		} else { // 非环形队列时,移除备份列表中所有与已处理值相等的内容
+			_, err = q.hub.Do("LREM", q.BackupListName, 0, reply)
+		}
+		if nil != err {
+			q.l.Errorf("Pop() remove from %s failed: %s", q.BackupListName, err.Error())
+		}
+	}
+}
+
+// Push() 添加数据到Redis列表
+func (q *Queue) Push(obj interface{}) {
+	_, err := q.hub.Do("LPUSH", q.MainListName, obj) // 主列表队头插入
+	if nil != err {
+		q.l.Errorf("Push() push to %s failed: %s", q.MainListName, err.Error())
+	}
+}
+
+// recycle() 定时将备份队列中最早的一条内容,回收到主队列重试处理
+// 没有写到构造函数里,是为了可以灵活分离和分配生产者、消费者、回收者
+func (q *Queue) Recycle() {
+	if q.isRing { // 环形队列不需要做回收处理
+		return
+	}
+
+	go func(q *Queue) {
+		defer utils.DefaultGoroutineRecover(nil, `Redis队列回收`)
+
+		q.recycleTicker = time.NewTicker(q.RecycleTimeout)
+		for {
+			select {
+			case <-q.recycleTicker.C:
+				_, err := q.hub.Do("RPOPLPUSH", q.BackupListName, q.MainListName)
+				if nil != err {
+					q.l.Errorf("recycle() pop from %s and push to %s failed: %s", q.BackupListName, q.MainListName, err.Error())
+				}
+			}
+		}
+	}(q)
+}
+
+// 收到系统退出消息时自动关闭Redis队列相关资源
+func (q *Queue) closeWait() {
+	go func(q *Queue) {
+		defer utils.DefaultGoroutineRecover(nil, `Redis队列关闭`)
+		var c chan os.Signal
+		var s os.Signal
+		c = make(chan os.Signal, 1)
+		signal.Notify(c, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGKILL)
+		for {
+			s = <-c
+			switch s {
+			case syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGKILL:
+				if !q.isRing && nil != q.recycleTicker {
+					q.recycleTicker.Stop()
+				}
+				err := q.hub.Close()
+				if nil != err {
+					q.l.Errorf("closeWait() close redis queue connection failed %s", err.Error())
+				}
+				return
+			default:
+				return
+			}
+		}
+	}(q)
+}

+ 41 - 0
utils/redis/redis_queue_test.go

@@ -0,0 +1,41 @@
+package redis
+
+import (
+	"fmt"
+	"github.com/gomodule/redigo/redis"
+	"github.com/spf13/viper"
+	"testing"
+	"time"
+)
+
+func TestRedisQueue(t *testing.T) {
+	// 配置
+	viper.SetDefault("redis.host", "127.0.0.1:6379") // single node or master/slave
+	//viper.SetDefault("redis.nodes", []string{"192.168.101.68:6379"}) //  cluster or master/slave with sentinel
+	//viper.SetDefault("redis.master", "mymaster") // master name config for sentinel
+
+	// 初始化
+	q := NewRedisQueue("myList", "backList", 3000, 5*time.Second)
+	q.Clean()
+	q.Recycle()
+
+	// 消费
+	for i := 0; i < 4; i++ {
+		go func(idx int, q *Queue) {
+			q.Pop(func(reply interface{}) {
+				content, _ := redis.String(reply, nil)
+				fmt.Printf("Receiver %d get:%+v\n", idx, content)
+			})
+		}(i, q)
+	}
+	// 生产
+	go func(q *Queue) {
+		for i := 0; i < 20; i++ {
+			q.Push(fmt.Sprintf("Message %d", i+1))
+			time.Sleep(1 * time.Second)
+		}
+	}(q)
+
+	quit := make(chan bool)
+	<-quit
+}

+ 74 - 0
utils/redis/redis_test.go

@@ -0,0 +1,74 @@
+package redis
+
+import (
+	"fmt"
+	"github.com/gomodule/redigo/redis"
+	"github.com/spf13/viper"
+	"math/rand"
+	"strconv"
+	"testing"
+)
+
+func TestRedisRepository(t *testing.T) {
+	r := NewTestRepository()
+	for i := 0; i < 10; i++ {
+		err := r.Save2("test", []byte(strconv.Itoa(rand.Intn(100))), 0)
+		if nil != err {
+			println("Redis error:", err)
+		}
+	}
+}
+
+// Redis操作类示例
+type TestRepository struct {
+	hub *Hub
+}
+
+func NewTestRepository() *TestRepository {
+	viper.SetDefault("redis.host", "127.0.0.1:6379")
+	hub := NewHub()
+	return &TestRepository{hub: hub}
+}
+
+func (r *TestRepository) Save2(key string, value []byte, expire int) error {
+	var err error
+	var reply string
+	if expire > 0 {
+		reply, err = redis.String(r.hub.Do("SET", key, value, expire))
+	} else {
+		reply, err = redis.String(r.hub.Do("SET", key, value))
+	}
+
+	if nil != err {
+		return err
+	} else if reply != "" {
+		println(reply)
+	}
+	return nil
+}
+
+func (r *TestRepository) Save(key string, value []byte) error {
+	fmt.Printf("input: %s\n", string(value))
+
+	var err error
+	var i interface{}
+	i, err = r.hub.Do("SET", key, value)
+	if nil != err {
+		fmt.Printf("%T\n", err)
+		return err
+	}
+	fmt.Printf("%T\n", i)
+	println(fmt.Sprintf("%s", i))
+	return nil
+}
+
+func (r *TestRepository) Test(key string, value []byte) error {
+	i, err := r.hub.Do("HSET", key, []byte("code"), value)
+	if nil != err {
+		fmt.Printf("%T\n", err)
+		return err
+	}
+	fmt.Printf("%T\n", i)
+	println(fmt.Sprintf("%d", i))
+	return nil
+}