Browse Source

redisc update

marion 4 years ago
parent
commit
a06ab6a2f2

+ 72 - 0
utils/redis/redigo_adapter.go

@@ -0,0 +1,72 @@
+package redis
+
+import (
+	"github.com/gomodule/redigo/redis"
+	"github.com/spf13/viper"
+	"strings"
+	"time"
+)
+
+// redigo适配器
+type RedigoAdapter struct {
+	pool *redis.Pool
+}
+
+// 返回redigo适配器新实例
+func NewRedigoAdapter(addr string) (IRedisAdapter, error) {
+	opts := []redis.DialOption{redis.DialConnectTimeout(5 * time.Second)}
+	password := viper.GetString("redis.password")
+	if len(strings.TrimSpace(password)) > 0 {
+		opts = append(opts, redis.DialPassword(strings.TrimSpace(password)))
+	}
+	pool, err := createPool(addr, opts...)
+	if nil != err {
+		return nil, err
+	}
+	return &RedigoAdapter{
+		pool: pool,
+	}, nil
+}
+
+// 创建redis连接池
+func createPool(addr string, opts ...redis.DialOption) (*redis.Pool, error) {
+	maxIdle := viper.GetInt("redis.max_idle")
+	maxActive := viper.GetInt("redis.max_active")
+	idleTimeout := viper.GetDuration("redis.timeout")
+
+	return &redis.Pool{
+		MaxIdle:     maxIdle,
+		MaxActive:   maxActive,
+		IdleTimeout: idleTimeout,
+		Dial: func() (redis.Conn, error) {
+			if conn, err := redis.Dial("tcp", addr, opts...); nil != err {
+				return nil, err
+			} else {
+				return conn, nil
+			}
+		},
+		TestOnBorrow: func(c redis.Conn, t time.Time) error {
+			_, err := c.Do("PING")
+			return err
+		},
+	}, nil
+}
+
+// 关闭Redis连接
+func (a *RedigoAdapter) Close() error {
+	return a.pool.Close()
+}
+
+// 执行Redis命令
+func (a *RedigoAdapter) Do(commandName string, args ...interface{}) (interface{}, error) {
+	conn := a.pool.Get()
+	defer func(conn redis.Conn) {
+		_ = conn.Close()
+	}(conn)
+
+	if nil == conn {
+		return nil, ErrRedisConnNil
+	} else {
+		return conn.Do(commandName, args...)
+	}
+}

+ 20 - 100
utils/redis/redis_conn.go

@@ -4,11 +4,9 @@ import (
 	"git.aionnect.com/aionnect/go-common/utils"
 	"git.aionnect.com/aionnect/go-common/utils/logger"
 	"github.com/gomodule/redigo/redis"
-	"github.com/mna/redisc"
 	"github.com/spf13/viper"
 	"os"
 	"os/signal"
-	"strings"
 	"sync"
 	"syscall"
 	"time"
@@ -25,12 +23,17 @@ func (err ErrRedis) Error() string {
 	return string(err)
 }
 
+// 抽象的Redis操作适配器接口定义
+type IRedisAdapter interface {
+	Close() error
+	Do(commandName string, args ...interface{}) (reply interface{}, err error)
+}
+
 // Redis连接池适配器
 type Hub struct {
 	LOG     func() *logger.Logger
-	pool    *redis.Pool
-	cluster *redisc.Cluster
 	once    sync.Once
+	adapter IRedisAdapter
 }
 
 // 获取Redis连接池适配器的新实例
@@ -45,76 +48,27 @@ func NewHub() *Hub {
 	}
 }
 
-// 创建连接池
-func (h *Hub) createPool(addr string, opts ...redis.DialOption) (*redis.Pool, error) {
-	maxIdle := viper.GetInt("redis.max_idle")
-	maxActive := viper.GetInt("redis.max_active")
-	idleTimeout := viper.GetDuration("redis.timeout")
-
-	return &redis.Pool{
-		MaxIdle:     maxIdle,
-		MaxActive:   maxActive,
-		IdleTimeout: idleTimeout,
-		Dial: func() (redis.Conn, error) {
-			if conn, err := redis.Dial("tcp", addr, opts...); err != nil {
-				h.LOG().Warnf("Get Redis connection failed: %s", err.Error())
-				return nil, err
-			} else {
-				return conn, nil
-			}
-		},
-		TestOnBorrow: func(c redis.Conn, t time.Time) error {
-			_, err := c.Do("PING")
-			return err
-		},
-	}, nil
-}
-
 // 建立与Redis的连接
 func (h *Hub) conn() error {
-	dialOpts := []redis.DialOption{
-		redis.DialConnectTimeout(5 * time.Second),
-		//redis.DialReadTimeout(5 * time.Second),
-		//redis.DialWriteTimeout(5 * time.Second),
-	}
-
+	var err error
 	nodes := viper.GetStringSlice("redis.nodes")
 	if nil != nodes && len(nodes) > 0 { // 集群
-		cluster := &redisc.Cluster{
-			StartupNodes: nodes,
-			DialOptions:  dialOpts,
-			CreatePool:   h.createPool,
-		}
-		h.cluster = cluster
+		h.adapter, err = NewGoRedisClusterAdapter(nodes)
+		//h.adapter, err = NewRediscAdapter(nodes)
 	} else { // 单点
-		var pool *redis.Pool
 		host := viper.GetString("redis.host")
-		password := viper.GetString("redis.password")
-		if len(strings.TrimSpace(password)) > 0 {
-			dialOpts = append(dialOpts, redis.DialPassword(strings.TrimSpace(password)))
-		}
-		pool, _ = h.createPool(host, dialOpts...)
-		h.pool = pool
+		h.adapter, err = NewRedigoAdapter(host)
+	}
+	if nil != err {
+		h.LOG().Errorf("Get Redis connection pool failed: %s", err.Error())
+		return err
 	}
 	return h.ping()
 }
 
 // 连接测试
 func (h *Hub) ping() error {
-	// 避免循环调用,此处不调用已包装好的Get和Do方法
-	var conn redis.Conn
-	if nil != h.pool {
-		conn = h.pool.Get()
-	} else if nil != h.cluster {
-		conn = h.cluster.Get()
-	} else {
-		return ErrRedisConnNil
-	}
-	defer func(conn redis.Conn) {
-		_ = conn.Close()
-	}(conn)
-
-	if reply, err := redis.String(conn.Do("PING")); err != nil || reply != "PONG" {
+	if reply, err := redis.String(h.adapter.Do("PING")); err != nil || reply != "PONG" {
 		if err != nil {
 			h.LOG().Warnf("Can not connect to redis: %s", err.Error())
 		} else {
@@ -150,42 +104,14 @@ func (h *Hub) closeWait() {
 	}(h)
 }
 
-// 关闭Redis连接
+// 关闭Redis连接池适配器
 func (h *Hub) Close() error {
-	var err error
-	if nil != h.cluster {
-		err = h.cluster.Close()
-	} else if nil != h.pool {
-		err = h.pool.Close()
-	}
-	return err
+	return h.adapter.Close()
 }
 
 // 执行Redis命令
-func (h *Hub) Do(cmd string, args ...interface{}) (interface{}, error) {
-	conn, err := h.Get()
-	if nil != err {
-		return nil, err
-	}
-	defer func(conn redis.Conn) {
-		_ = conn.Close()
-	}(conn)
-
-	if nil == conn {
-		return nil, ErrRedisConnNil
-	} else {
-		reply, err := conn.Do(cmd, args...)
-		if nil != err {
-			return nil, err
-		}
-		return reply, nil
-	}
-}
-
-// 从连接池中获取一个连接
-func (h *Hub) Get() (redis.Conn, error) {
+func (h *Hub) Do(commandName string, args ...interface{}) (interface{}, error) {
 	var err error
-	// 延迟一次性初始化Redis连接池
 	h.once.Do(func() {
 		e := h.conn()
 		if nil != e {
@@ -198,11 +124,5 @@ func (h *Hub) Get() (redis.Conn, error) {
 		return nil, err
 	}
 
-	if nil != h.pool {
-		return h.pool.Get(), nil
-	} else if nil != h.cluster {
-		return h.cluster.Get(), nil
-	} else {
-		return nil, ErrRedisConnNil
-	}
+	return h.adapter.Do(commandName, args...)
 }

+ 42 - 0
utils/redis/redis_go_cluster_adapter.go

@@ -0,0 +1,42 @@
+package redis
+
+import (
+	redisc "github.com/chasex/redis-go-cluster"
+	"github.com/spf13/viper"
+	"time"
+)
+
+// redis-go-cluster适配器
+type GoRedisClusterAdapter struct {
+	cluster *redisc.Cluster
+}
+
+// 返回redis-go-cluster适配器新实例
+func NewGoRedisClusterAdapter(nodes []string) (IRedisAdapter, error) {
+	maxActive := viper.GetInt("redis.max_active")
+	idleTimeout := viper.GetDuration("redis.timeout")
+
+	cluster, err := redisc.NewCluster(&redisc.Options{
+		StartNodes:  nodes,
+		ConnTimeout: 5 * time.Second,
+		KeepAlive:   maxActive,
+		AliveTime:   idleTimeout,
+	})
+	if nil != err {
+		return nil, err
+	}
+	return &GoRedisClusterAdapter{
+		cluster: cluster,
+	}, nil
+}
+
+// 关闭Redis连接
+func (a *GoRedisClusterAdapter) Close() error {
+	a.cluster.Close()
+	return nil
+}
+
+// 执行Redis命令
+func (a *GoRedisClusterAdapter) Do(commandName string, args ...interface{}) (interface{}, error) {
+	return a.cluster.Do(commandName, args...)
+}

+ 48 - 0
utils/redis/redisc_adapter.go

@@ -0,0 +1,48 @@
+package redis
+
+import (
+	"github.com/gomodule/redigo/redis"
+	"github.com/mna/redisc"
+	"time"
+)
+
+// redisc适配器
+type RediscAdapter struct {
+	cluster *redisc.Cluster
+}
+
+// 返回redisc适配器新实例
+func NewRediscAdapter(nodes []string) (IRedisAdapter, error) {
+	opts := []redis.DialOption{
+		redis.DialConnectTimeout(5 * time.Second),
+		//redis.DialReadTimeout(5 * time.Second),
+		//redis.DialWriteTimeout(5 * time.Second),
+	}
+	cluster := &redisc.Cluster{
+		StartupNodes: nodes,
+		DialOptions:  opts,
+		CreatePool:   createPool,
+	}
+	return &RediscAdapter{
+		cluster: cluster,
+	}, nil
+}
+
+// 关闭Redis连接
+func (a *RediscAdapter) Close() error {
+	return a.cluster.Close()
+}
+
+// 执行Redis命令
+func (a *RediscAdapter) Do(commandName string, args ...interface{}) (interface{}, error) {
+	conn := a.cluster.Get()
+	defer func(conn redis.Conn) {
+		_ = conn.Close()
+	}(conn)
+
+	if nil == conn {
+		return nil, ErrRedisConnNil
+	} else {
+		return conn.Do(commandName, args...)
+	}
+}