Browse Source

redisc update

marion 4 years ago
parent
commit
341b1012c5
2 changed files with 106 additions and 92 deletions
  1. 1 1
      go.mod
  2. 105 91
      utils/redis/redis_conn.go

+ 1 - 1
go.mod

@@ -5,7 +5,6 @@ go 1.14
 require (
 	github.com/Shopify/sarama v1.19.0
 	github.com/bsm/sarama-cluster v2.1.15+incompatible
-	github.com/chasex/redis-go-cluster v1.0.1-0.20161207023922-222d81891f1d
 	github.com/dgrijalva/jwt-go v3.2.0+incompatible
 	github.com/eapache/go-resiliency v1.2.0 // indirect
 	github.com/fastly/go-utils v0.0.0-20180712184237-d95a45783239 // indirect
@@ -17,6 +16,7 @@ require (
 	github.com/lestrrat-go/file-rotatelogs v2.3.0+incompatible
 	github.com/lestrrat-go/strftime v1.0.1 // indirect
 	github.com/mailru/easyjson v0.7.1
+	github.com/mna/redisc v1.1.7
 	github.com/nats-io/nats-server/v2 v2.1.6 // indirect
 	github.com/nats-io/nats.go v1.9.2
 	github.com/onsi/ginkgo v1.12.0 // indirect

+ 105 - 91
utils/redis/redis_conn.go

@@ -1,14 +1,16 @@
 package redis
 
 import (
+	"git.aionnect.com/aionnect/go-common/utils"
 	"git.aionnect.com/aionnect/go-common/utils/logger"
-	redisc "github.com/chasex/redis-go-cluster"
 	"github.com/gomodule/redigo/redis"
+	"github.com/mna/redisc"
 	"github.com/spf13/viper"
 	"os"
 	"os/signal"
 	"strings"
 	"sync"
+	"syscall"
 	"time"
 )
 
@@ -30,28 +32,44 @@ func NewHub() *Hub {
 	}
 }
 
-func (h *Hub) conn() error {
-	//var err error
+func (h *Hub) createPool(addr string, opts ...redis.DialOption) (*redis.Pool, error) {
 	maxIdle := viper.GetInt("redis.max_idle")
 	maxActive := viper.GetInt("redis.max_active")
 	idleTimeout := viper.GetDuration("redis.timeout")
 
+	return &redis.Pool{
+		MaxIdle:     maxIdle,
+		MaxActive:   maxActive,
+		IdleTimeout: idleTimeout,
+		Dial: func() (redis.Conn, error) {
+			if conn, err := redis.Dial("tcp", addr, opts...); err != nil {
+				h.LOG().Warnf("Get Redis connection failed: %s", err.Error())
+				return nil, err
+			} else {
+				return conn, nil
+			}
+		},
+		TestOnBorrow: func(c redis.Conn, t time.Time) error {
+			_, err := c.Do("PING")
+			return err
+		},
+	}, nil
+}
+
+func (h *Hub) conn() error {
+	dialOpts := []redis.DialOption{
+		redis.DialConnectTimeout(5 * time.Second),
+		redis.DialReadTimeout(50 * time.Millisecond),
+		redis.DialWriteTimeout(50 * time.Millisecond),
+	}
+
 	nodes := viper.GetStringSlice("redis.nodes")
 	if nil != nodes && len(nodes) > 0 { // 集群
-		//var cluster *redisc.Cluster
-		cluster, err := redisc.NewCluster(&redisc.Options{
-			StartNodes:   nodes,
-			ConnTimeout:  5 * time.Second,
-			ReadTimeout:  50 * time.Millisecond,
-			WriteTimeout: 50 * time.Millisecond,
-			KeepAlive:    maxActive,
-			AliveTime:    idleTimeout,
-		})
-		if nil != err {
-			return err
+		cluster := &redisc.Cluster{
+			StartupNodes: nodes,
+			DialOptions:  dialOpts,
+			CreatePool:   h.createPool,
 		}
-
-		h.LOG().Info("Redis cluster connected")
 		h.cluster = cluster
 	} else { // 单点
 		var pool *redis.Pool
@@ -61,66 +79,62 @@ func (h *Hub) conn() error {
 		if len(strings.TrimSpace(password)) > 0 {
 			opts = append(opts, redis.DialPassword(strings.TrimSpace(password)))
 		}
+		pool, _ = h.createPool(host, dialOpts...)
+		h.pool = pool
+	}
 
-		pool = &redis.Pool{
-			MaxIdle:     maxIdle,
-			MaxActive:   maxActive,
-			IdleTimeout: idleTimeout,
-			Dial: func() (redis.Conn, error) {
-				if conn, err := redis.Dial("tcp", host, opts...); err != nil {
-					h.LOG().Warnf("Get Redis connection failed: %s", err.Error())
-					return nil, err
-				} else {
-					return conn, nil
-				}
-			},
-			TestOnBorrow: func(c redis.Conn, t time.Time) error {
-				_, err := c.Do("PING")
-				return err
-			},
-		}
-
-		// 连接测试
-		conn := pool.Get()
-		defer func(conn redis.Conn) {
-			_ = conn.Close()
-		}(conn)
-		if reply, err := redis.String(conn.Do("PING")); err != nil || reply != "PONG" {
-
-			if err != nil {
-				h.LOG().Warnf("Can not connect to redis: %s", err.Error())
-			} else {
-				h.LOG().Warnf("Can not connect to redis: %s", reply)
-			}
+	return nil
+}
 
-			return err
+func (h *Hub) test() error {
+	// 连接测试
+	conn, err := h.Get()
+	if nil != err {
+		return err
+	}
+	defer func(conn redis.Conn) {
+		_ = conn.Close()
+	}(conn)
+
+	if reply, err := redis.String(conn.Do("PING")); err != nil || reply != "PONG" {
+		if err != nil {
+			h.LOG().Warnf("Can not connect to redis: %s", err.Error())
+		} else {
+			h.LOG().Warnf("Can not connect to redis: %s", reply)
 		}
-
-		h.LOG().Info("Redis connected")
-		h.pool = pool
+		return err
 	}
+	h.LOG().Info("Redis connected")
+	return nil
+}
 
-	go func(h2 *Hub) {
-		defer func(h3 *Hub) {
-			if err := h3.Close(); nil != err {
-				h3.LOG().Error("Close redis connection failed:", err.Error())
+func (h *Hub) closeWait() {
+	go func(h *Hub) {
+		defer utils.DefaultGoroutineRecover(nil, `Redis连接池关闭`)
+		var c chan os.Signal
+		var s os.Signal
+		c = make(chan os.Signal, 1)
+		signal.Notify(c, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGKILL)
+		for {
+			s = <-c
+			switch s {
+			case syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGKILL:
+				if err := h.Close(); nil != err {
+					h.LOG().Error("Close redis connection failed:", err.Error())
+				}
+				h.LOG().Info("Redis connection closed")
+				return
+			default:
+				return
 			}
-			h3.LOG().Info("Redis connection closed")
-		}(h2)
-		quit := make(chan os.Signal)
-		signal.Notify(quit, os.Interrupt)
-		<-quit
+		}
 	}(h)
-	return nil
 }
 
 func (h *Hub) Close() error {
-	if nil == h {
-		return nil
-	}
 	var err error
 	if nil != h.cluster {
-		h.cluster.Close()
+		err = h.cluster.Close()
 	} else if nil != h.pool {
 		err = h.pool.Close()
 	}
@@ -128,34 +142,22 @@ func (h *Hub) Close() error {
 }
 
 func (h *Hub) Do(cmd string, args ...interface{}) (interface{}, error) {
-	if nil == h {
-		return nil, nil
-	}
-	var err error
-	h.once.Do(func() {
-		e := h.conn()
-		if nil != e {
-			err = e
-		}
-	})
+	conn, err := h.Get()
 	if nil != err {
 		return nil, err
 	}
+	defer func(conn redis.Conn) {
+		_ = conn.Close()
+	}(conn)
 
-	if nil != h.cluster {
-		return h.cluster.Do(cmd, args...)
-	} else if nil != h.pool {
-		conn := h.pool.Get()
-		defer func(conn redis.Conn) {
-			_ = conn.Close()
-		}(conn)
-		do, err := conn.Do(cmd, args...)
+	if nil != conn {
+		return nil, ErrRedisConnNil
+	} else {
+		reply, err := conn.Do(cmd, args...)
 		if nil != err {
 			return nil, err
 		}
-		return do, nil
-	} else {
-		return nil, ErrRedisConnNil
+		return reply, nil
 	}
 }
 
@@ -170,12 +172,24 @@ func (err ErrRedis) Error() string {
 }
 
 func (h *Hub) Get() (redis.Conn, error) {
-	if nil == h.pool {
-		err := h.conn()
-		if nil != err {
-			return nil, err
+	var err error
+	h.once.Do(func() {
+		e := h.conn()
+		if nil != e {
+			err = e
+		} else {
+			h.closeWait()
 		}
+	})
+	if nil != err {
+		return nil, err
+	}
+
+	if nil != h.pool {
+		return h.pool.Get(), nil
+	} else if nil != h.cluster {
+		return h.cluster.Get(), nil
+	} else {
+		return nil, ErrRedisConnNil
 	}
-	conn := h.pool.Get()
-	return conn, nil
 }