Browse Source

redis sentinel

marion 4 years ago
parent
commit
eb1f0195d3
3 changed files with 94 additions and 3 deletions
  1. 1 0
      go.mod
  2. 9 3
      utils/redis/redis_conn.go
  3. 84 0
      utils/redis/sentinel_adapter.go

+ 1 - 0
go.mod

@@ -3,6 +3,7 @@ module git.aionnect.com/aionnect/go-common
 go 1.14
 
 require (
+	github.com/FZambia/sentinel v1.1.0
 	github.com/Shopify/sarama v1.19.0
 	github.com/bsm/sarama-cluster v2.1.15+incompatible
 	github.com/chasex/redis-go-cluster v1.0.1-0.20161207023922-222d81891f1d

+ 9 - 3
utils/redis/redis_conn.go

@@ -7,6 +7,7 @@ import (
 	"github.com/spf13/viper"
 	"os"
 	"os/signal"
+	"strings"
 	"sync"
 	"syscall"
 	"time"
@@ -53,10 +54,15 @@ func (h *Hub) conn() error {
 	var err error
 	nodes := viper.GetStringSlice("redis.nodes")
 	if nil != nodes && len(nodes) > 0 { // 集群
-		h.adapter, err = NewGoRedisClusterAdapter(nodes)
-		//h.adapter, err = NewRediscAdapter(nodes)
+		masterName := viper.GetString("redis.master")
+		if len(strings.TrimSpace(masterName)) > 0 { // redis sentinel集群
+			h.adapter, err = NewSentinelAdapter(nodes, masterName) // 采用sentinel适配器
+		} else { // redis cluster集群
+			//h.adapter, err = NewGoRedisClusterAdapter(nodes) // 采用redis-go-cluster适配器
+			h.adapter, err = NewRediscAdapter(nodes) // 采用redisc适配器
+		}
 	} else { // 单点
-		host := viper.GetString("redis.host")
+		host := viper.GetString("redis.host") // 采用redigo适配器
 		h.adapter, err = NewRedigoAdapter(host)
 	}
 	if nil != err {

+ 84 - 0
utils/redis/sentinel_adapter.go

@@ -0,0 +1,84 @@
+package redis
+
+import (
+	"fmt"
+	"github.com/FZambia/sentinel"
+	"github.com/gomodule/redigo/redis"
+	"github.com/spf13/viper"
+	"strings"
+	"time"
+)
+
+// sentinel适配器
+type SentinelAdapter struct {
+	pool *redis.Pool
+}
+
+// 返回sentinel适配器新实例
+func NewSentinelAdapter(nodes []string, masterName string) (IRedisAdapter, error) {
+	opts := []redis.DialOption{redis.DialConnectTimeout(5 * time.Second)}
+	password := viper.GetString("redis.password")
+	if len(strings.TrimSpace(password)) > 0 {
+		opts = append(opts, redis.DialPassword(strings.TrimSpace(password)))
+	}
+	s := &sentinel.Sentinel{
+		Addrs:      nodes,
+		MasterName: masterName,
+		Dial: func(addr string) (redis.Conn, error) {
+			c, err := redis.Dial("tcp", addr, opts...)
+			if err != nil {
+				return nil, err
+			}
+			return c, nil
+		},
+	}
+	maxIdle := viper.GetInt("redis.max_idle")
+	maxActive := viper.GetInt("redis.max_active")
+	idleTimeout := viper.GetDuration("redis.timeout")
+
+	pool := &redis.Pool{
+		MaxIdle:     maxIdle,
+		MaxActive:   maxActive,
+		IdleTimeout: idleTimeout,
+		Dial: func() (redis.Conn, error) {
+			addr, err := s.MasterAddr()
+			if err != nil {
+				return nil, err
+			}
+			if conn, err := redis.Dial("tcp", addr, opts...); nil != err {
+				return nil, err
+			} else {
+				return conn, nil
+			}
+		},
+		TestOnBorrow: func(c redis.Conn, t time.Time) error {
+			if !sentinel.TestRole(c, "master") {
+				return fmt.Errorf("redis sentinel role check failed")
+			} else {
+				return nil
+			}
+		},
+	}
+	return &SentinelAdapter{
+		pool: pool,
+	}, nil
+}
+
+// 关闭Redis连接
+func (a *SentinelAdapter) Close() error {
+	return a.pool.Close()
+}
+
+// 执行Redis命令
+func (a *SentinelAdapter) Do(commandName string, args ...interface{}) (interface{}, error) {
+	conn := a.pool.Get()
+	defer func(conn redis.Conn) {
+		_ = conn.Close()
+	}(conn)
+
+	if nil == conn {
+		return nil, ErrRedisConnNil
+	} else {
+		return conn.Do(commandName, args...)
+	}
+}