package redis import ( "git.aionnect.com/aionnect/go-common/utils/logger" redisc "github.com/chasex/redis-go-cluster" "github.com/gomodule/redigo/redis" "github.com/spf13/viper" "os" "os/signal" "strings" "sync" "time" ) type Hub struct { LOG func() *logger.Logger pool *redis.Pool cluster *redisc.Cluster once sync.Once } func NewHub() *Hub { viper.SetDefault("redis.max_idle", 300) viper.SetDefault("redis.max_active", 1000) viper.SetDefault("redis.timeout", time.Duration(60000)) return &Hub{ LOG: func() *logger.Logger { return logger.New() }, } } func (h *Hub) conn() error { //var err error maxIdle := viper.GetInt("redis.max_idle") maxActive := viper.GetInt("redis.max_active") idleTimeout := viper.GetDuration("redis.timeout") nodes := viper.GetStringSlice("redis.nodes") if nil != nodes && len(nodes) > 0 { // 集群 //var cluster *redisc.Cluster cluster, err := redisc.NewCluster(&redisc.Options{ StartNodes: nodes, ConnTimeout: 5 * time.Second, ReadTimeout: 50 * time.Millisecond, WriteTimeout: 50 * time.Millisecond, KeepAlive: maxActive, AliveTime: idleTimeout, }) if nil != err { return err } h.LOG().Info("Redis cluster connected") h.cluster = cluster } else { // 单点 var pool *redis.Pool host := viper.GetString("redis.host") opts := []redis.DialOption{redis.DialConnectTimeout(5 * time.Second)} password := viper.GetString("redis.password") if len(strings.TrimSpace(password)) > 0 { opts = append(opts, redis.DialPassword(strings.TrimSpace(password))) } pool = &redis.Pool{ MaxIdle: maxIdle, MaxActive: maxActive, IdleTimeout: idleTimeout, Dial: func() (redis.Conn, error) { if conn, err := redis.Dial("tcp", host, opts...); err != nil { h.LOG().Warnf("Get Redis connection failed: %s", err.Error()) return nil, err } else { return conn, nil } }, TestOnBorrow: func(c redis.Conn, t time.Time) error { _, err := c.Do("PING") return err }, } // 连接测试 conn := pool.Get() defer func(conn redis.Conn) { _ = conn.Close() }(conn) if reply, err := redis.String(conn.Do("PING")); err != nil || reply != "PONG" { if err != nil { h.LOG().Warnf("Can not connect to redis: %s", err.Error()) } else { h.LOG().Warnf("Can not connect to redis: %s", reply) } return err } h.LOG().Info("Redis connected") h.pool = pool } go func(h2 *Hub) { defer func(h3 *Hub) { if err := h3.Close(); nil != err { h3.LOG().Error("Close redis connection failed:", err.Error()) } h3.LOG().Info("Redis connection closed") }(h2) quit := make(chan os.Signal) signal.Notify(quit, os.Interrupt) <-quit }(h) return nil } func (h *Hub) Close() error { if nil == h { return nil } var err error if nil != h.cluster { h.cluster.Close() } else if nil != h.pool { err = h.pool.Close() } return err } func (h *Hub) Do(cmd string, args ...interface{}) (interface{}, error) { if nil == h { return nil, nil } var err error h.once.Do(func() { e := h.conn() if nil != e { err = e } }) if nil != err { return nil, err } if nil != h.cluster { return h.cluster.Do(cmd, args...) } else if nil != h.pool { conn := h.pool.Get() defer func(conn redis.Conn) { _ = conn.Close() }(conn) do, err := conn.Do(cmd, args...) if nil != err { return nil, err } return do, nil } else { return nil, ErrRedisConnNil } } const ( ErrRedisConnNil = ErrRedis("redis conn nil") ) type ErrRedis string func (err ErrRedis) Error() string { return string(err) } func (h *Hub) Get() (redis.Conn, error) { if nil == h.pool { err := h.conn() if nil != err { return nil, err } } conn := h.pool.Get() return conn, nil }