package redis import ( "git.aionnect.com/aionnect/go-common/utils" "git.aionnect.com/aionnect/go-common/utils/logger" "github.com/gomodule/redigo/redis" "github.com/mna/redisc" "github.com/spf13/viper" "os" "os/signal" "strings" "sync" "syscall" "time" ) // 自定义错误 const ( ErrRedisConnNil = ErrRedis("redis conn nil") ) type ErrRedis string func (err ErrRedis) Error() string { return string(err) } // Redis连接池适配器 type Hub struct { LOG func() *logger.Logger pool *redis.Pool cluster *redisc.Cluster once sync.Once } // 获取Redis连接池适配器的新实例 func NewHub() *Hub { viper.SetDefault("redis.max_idle", 300) viper.SetDefault("redis.max_active", 1000) viper.SetDefault("redis.timeout", time.Duration(60000)) return &Hub{ LOG: func() *logger.Logger { return logger.New() }, } } // 创建连接池 func (h *Hub) createPool(addr string, opts ...redis.DialOption) (*redis.Pool, error) { maxIdle := viper.GetInt("redis.max_idle") maxActive := viper.GetInt("redis.max_active") idleTimeout := viper.GetDuration("redis.timeout") return &redis.Pool{ MaxIdle: maxIdle, MaxActive: maxActive, IdleTimeout: idleTimeout, Dial: func() (redis.Conn, error) { if conn, err := redis.Dial("tcp", addr, opts...); err != nil { h.LOG().Warnf("Get Redis connection failed: %s", err.Error()) return nil, err } else { return conn, nil } }, TestOnBorrow: func(c redis.Conn, t time.Time) error { _, err := c.Do("PING") return err }, }, nil } // 建立与Redis的连接 func (h *Hub) conn() error { dialOpts := []redis.DialOption{ redis.DialConnectTimeout(5 * time.Second), //redis.DialReadTimeout(5 * time.Second), //redis.DialWriteTimeout(5 * time.Second), } nodes := viper.GetStringSlice("redis.nodes") if nil != nodes && len(nodes) > 0 { // 集群 cluster := &redisc.Cluster{ StartupNodes: nodes, DialOptions: dialOpts, CreatePool: h.createPool, } h.cluster = cluster } else { // 单点 var pool *redis.Pool host := viper.GetString("redis.host") password := viper.GetString("redis.password") if len(strings.TrimSpace(password)) > 0 { dialOpts = append(dialOpts, redis.DialPassword(strings.TrimSpace(password))) } pool, _ = h.createPool(host, dialOpts...) h.pool = pool } return h.ping() } // 连接测试 func (h *Hub) ping() error { // 连接测试 conn, err := h.Get() if nil != err { return err } defer func(conn redis.Conn) { _ = conn.Close() }(conn) if reply, err := redis.String(conn.Do("PING")); err != nil || reply != "PONG" { if err != nil { h.LOG().Warnf("Can not connect to redis: %s", err.Error()) } else { h.LOG().Warnf("Can not connect to redis: %s", reply) } return err } h.LOG().Info("Redis connected") return nil } // 监听系统退出信号量,自动关闭Redis连接 func (h *Hub) closeWait() { go func(h *Hub) { defer utils.DefaultGoroutineRecover(nil, `Redis连接池关闭`) var c chan os.Signal var s os.Signal c = make(chan os.Signal, 1) signal.Notify(c, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGKILL) for { s = <-c switch s { case syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGKILL: if err := h.Close(); nil != err { h.LOG().Error("Close redis connection failed:", err.Error()) } h.LOG().Info("Redis connection closed") return default: return } } }(h) } // 关闭Redis连接 func (h *Hub) Close() error { var err error if nil != h.cluster { err = h.cluster.Close() } else if nil != h.pool { err = h.pool.Close() } return err } // 执行Redis命令 func (h *Hub) Do(cmd string, args ...interface{}) (interface{}, error) { conn, err := h.Get() if nil != err { return nil, err } defer func(conn redis.Conn) { _ = conn.Close() }(conn) if nil == conn { return nil, ErrRedisConnNil } else { reply, err := conn.Do(cmd, args...) if nil != err { return nil, err } return reply, nil } } // 从连接池中获取一个连接 func (h *Hub) Get() (redis.Conn, error) { var err error // 延迟一次性初始化Redis连接池 h.once.Do(func() { e := h.conn() if nil != e { err = e } else { h.closeWait() } }) if nil != err { return nil, err } if nil != h.pool { return h.pool.Get(), nil } else if nil != h.cluster { return h.cluster.Get(), nil } else { return nil, ErrRedisConnNil } }