|
@@ -0,0 +1,170 @@
|
|
|
+package redis
|
|
|
+
|
|
|
+import (
|
|
|
+ "errors"
|
|
|
+ "git.aionnect.com/aionnect/go-common/utils/logger"
|
|
|
+ redisc "github.com/chasex/redis-go-cluster"
|
|
|
+ "github.com/gomodule/redigo/redis"
|
|
|
+ "github.com/spf13/viper"
|
|
|
+ "os"
|
|
|
+ "os/signal"
|
|
|
+ "strings"
|
|
|
+ "sync"
|
|
|
+ "time"
|
|
|
+)
|
|
|
+
|
|
|
+type Hub struct {
|
|
|
+ LOG func() *logger.Logger
|
|
|
+ pool *redis.Pool
|
|
|
+ cluster redisc.Cluster
|
|
|
+ once sync.Once
|
|
|
+}
|
|
|
+
|
|
|
+func NewHub() *Hub {
|
|
|
+ viper.SetDefault("redis.max_idle", 300)
|
|
|
+ viper.SetDefault("redis.max_active", 1000)
|
|
|
+ viper.SetDefault("redis.timeout", time.Duration(60000))
|
|
|
+ return &Hub{
|
|
|
+ LOG: func() *logger.Logger {
|
|
|
+ return logger.New()
|
|
|
+ },
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func (h *Hub) conn() error {
|
|
|
+ //var err error
|
|
|
+ maxIdle := viper.GetInt("redis.max_idle")
|
|
|
+ maxActive := viper.GetInt("redis.max_active")
|
|
|
+ idleTimeout := viper.GetDuration("redis.timeout")
|
|
|
+
|
|
|
+ nodes := viper.GetStringSlice("redis.nodes")
|
|
|
+ if nil != nodes && len(nodes) > 0 { // 集群
|
|
|
+ //var cluster *redisc.Cluster
|
|
|
+ cluster, err := redisc.NewCluster(&redisc.Options{
|
|
|
+ StartNodes: nodes,
|
|
|
+ ConnTimeout: 5 * time.Second,
|
|
|
+ ReadTimeout: 50 * time.Millisecond,
|
|
|
+ WriteTimeout: 50 * time.Millisecond,
|
|
|
+ KeepAlive: maxActive,
|
|
|
+ AliveTime: idleTimeout,
|
|
|
+ })
|
|
|
+ if nil != err {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+
|
|
|
+ h.LOG().Info("Redis cluster connected")
|
|
|
+ h.cluster = cluster
|
|
|
+ } else { // 单点
|
|
|
+ var pool *redis.Pool
|
|
|
+ host := viper.GetString("redis.host")
|
|
|
+ opts := []redis.DialOption{redis.DialConnectTimeout(5 * time.Second)}
|
|
|
+ password := viper.GetString("redis.password")
|
|
|
+ if len(strings.TrimSpace(password)) > 0 {
|
|
|
+ opts = append(opts, redis.DialPassword(strings.TrimSpace(password)))
|
|
|
+ }
|
|
|
+
|
|
|
+ pool = &redis.Pool{
|
|
|
+ MaxIdle: maxIdle,
|
|
|
+ MaxActive: maxActive,
|
|
|
+ IdleTimeout: idleTimeout,
|
|
|
+ Dial: func() (redis.Conn, error) {
|
|
|
+ if conn, err := redis.Dial("tcp", host, opts...); err != nil {
|
|
|
+ h.LOG().Warnf("Get Redis connection failed: %s", err.Error())
|
|
|
+ return nil, err
|
|
|
+ } else {
|
|
|
+ return conn, nil
|
|
|
+ }
|
|
|
+ },
|
|
|
+ TestOnBorrow: func(c redis.Conn, t time.Time) error {
|
|
|
+ _, err := c.Do("PING")
|
|
|
+ return err
|
|
|
+ },
|
|
|
+ }
|
|
|
+
|
|
|
+ // 连接测试
|
|
|
+ conn := pool.Get()
|
|
|
+ defer func(conn redis.Conn) {
|
|
|
+ _ = conn.Close()
|
|
|
+ }(conn)
|
|
|
+ if reply, err := redis.String(conn.Do("PING")); err != nil || reply != "PONG" {
|
|
|
+
|
|
|
+ if err != nil {
|
|
|
+ h.LOG().Warnf("Can not connect to redis: %s", err.Error())
|
|
|
+ } else {
|
|
|
+ h.LOG().Warnf("Can not connect to redis: %s", reply)
|
|
|
+ }
|
|
|
+
|
|
|
+ return err
|
|
|
+ }
|
|
|
+
|
|
|
+ h.LOG().Info("Redis connected")
|
|
|
+ h.pool = pool
|
|
|
+ }
|
|
|
+
|
|
|
+ go func(h2 *Hub) {
|
|
|
+ defer func(h3 *Hub) {
|
|
|
+ if err := h3.Close(); nil != err {
|
|
|
+ h3.LOG().Error("Close redis connection failed:", err.Error())
|
|
|
+ }
|
|
|
+ h3.LOG().Info("Redis connection closed")
|
|
|
+ }(h2)
|
|
|
+ quit := make(chan os.Signal)
|
|
|
+ signal.Notify(quit, os.Interrupt)
|
|
|
+ <-quit
|
|
|
+ }(h)
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+func (h *Hub) Close() error {
|
|
|
+ if nil == h {
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+ var err error
|
|
|
+ if nil != h.pool {
|
|
|
+ err = h.pool.Close()
|
|
|
+ }
|
|
|
+ return err
|
|
|
+}
|
|
|
+
|
|
|
+func (h *Hub) Do(cmd string, args ...interface{}) (interface{}, error) {
|
|
|
+ if nil == h {
|
|
|
+ return nil, nil
|
|
|
+ }
|
|
|
+ var err error
|
|
|
+ h.once.Do(func() {
|
|
|
+ e := h.conn()
|
|
|
+ if nil != e {
|
|
|
+ err = e
|
|
|
+ }
|
|
|
+ })
|
|
|
+ if nil != err {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+
|
|
|
+ if nil != h.cluster {
|
|
|
+ return h.cluster.Do(cmd, args...)
|
|
|
+ } else if h.pool != nil {
|
|
|
+ conn := h.pool.Get()
|
|
|
+ defer func(conn redis.Conn) {
|
|
|
+ _ = conn.Close()
|
|
|
+ }(conn)
|
|
|
+ do, err := conn.Do(cmd, args...)
|
|
|
+ if err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+ return do, nil
|
|
|
+ } else {
|
|
|
+ return nil, errors.New("redis conn nil")
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func (h *Hub) Get() (redis.Conn, error) {
|
|
|
+ if h.pool == nil {
|
|
|
+ err := h.conn()
|
|
|
+ if err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+ }
|
|
|
+ conn := h.pool.Get()
|
|
|
+ return conn, nil
|
|
|
+}
|