package redis import ( "git.aionnect.com/aionnect/go-common/utils" "git.aionnect.com/aionnect/go-common/utils/logger" "github.com/gomodule/redigo/redis" "github.com/spf13/viper" "os" "os/signal" "strings" "sync" "syscall" "time" ) // 自定义错误 const ( ErrRedisConnNil = ErrRedis("redis conn nil") ) type ErrRedis string func (err ErrRedis) Error() string { return string(err) } // 抽象的Redis操作适配器接口定义 type IRedisAdapter interface { Close() error Do(commandName string, args ...interface{}) (interface{}, error) Pipeline() IRedisPipeline } type IRedisPipeline interface { Send(commandName string, args ...interface{}) IRedisPipeline Execute() ([]interface{}, error) } type redisCmd struct { commandName string args []interface{} } // Redis连接池适配器 type Hub struct { LOG func() *logger.Logger once sync.Once adapter IRedisAdapter } // 获取Redis连接池适配器的新实例 func NewHub() *Hub { viper.SetDefault("redis.max_idle", 300) viper.SetDefault("redis.max_active", 1000) viper.SetDefault("redis.timeout", 5*time.Second) return &Hub{ LOG: func() *logger.Logger { return logger.New() }, } } // 建立与Redis的连接 func (h *Hub) conn() error { var err error nodes := viper.GetStringSlice("redis.nodes") if nil != nodes && len(nodes) > 0 { // 集群 masterName := viper.GetString("redis.master") if len(strings.TrimSpace(masterName)) > 0 { // redis sentinel集群 h.adapter, err = NewSentinelAdapter(nodes, masterName) // 采用sentinel适配器 } else { // redis cluster集群 //h.adapter, err = NewGoRedisClusterAdapter(nodes) // 采用redis-go-cluster适配器 h.adapter, err = NewRediscAdapter(nodes) // 采用redisc适配器 } } else { // 单点 host := viper.GetString("redis.host") // 采用redigo适配器 h.adapter, err = NewRedigoAdapter(host) } if nil != err { h.LOG().Errorf("Get Redis connection pool failed: %s", err.Error()) return err } return h.ping() } // 连接测试 func (h *Hub) ping() error { if reply, err := redis.String(h.adapter.Do("PING")); err != nil || reply != "PONG" { if err != nil { h.LOG().Warnf("Can not connect to redis: %s", err.Error()) } else { h.LOG().Warnf("Can not connect to redis: %s", reply) } return err } h.LOG().Info("Redis connected") return nil } // 监听系统退出信号量,自动关闭Redis连接 func (h *Hub) closeWait() { go func(h *Hub) { defer utils.DefaultGoroutineRecover(nil, `Redis连接池关闭`) var c chan os.Signal var s os.Signal c = make(chan os.Signal, 1) signal.Notify(c, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGKILL) for { s = <-c switch s { case syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGKILL: if err := h.Close(); nil != err { h.LOG().Error("Close redis connection failed:", err.Error()) } h.LOG().Info("Redis connection closed") return default: return } } }(h) } // 关闭Redis连接池适配器 func (h *Hub) Close() error { return h.adapter.Close() } // 执行Redis命令 func (h *Hub) Do(commandName string, args ...interface{}) (interface{}, error) { var err error h.once.Do(func() { e := h.conn() if nil != e { err = e } else { h.closeWait() } }) if nil != err { return nil, err } return h.adapter.Do(commandName, args...) } // 返回命令管道操作对象 func (h *Hub) Pipeline() IRedisPipeline { return h.adapter.Pipeline() }