123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172 |
- package redis
- import (
- "errors"
- "git.aionnect.com/aionnect/go-common/utils/logger"
- redisc "github.com/chasex/redis-go-cluster"
- "github.com/gomodule/redigo/redis"
- "github.com/spf13/viper"
- "os"
- "os/signal"
- "strings"
- "sync"
- "time"
- )
- type Hub struct {
- LOG func() *logger.Logger
- pool *redis.Pool
- cluster *redisc.Cluster
- once sync.Once
- }
- func NewHub() *Hub {
- viper.SetDefault("redis.max_idle", 300)
- viper.SetDefault("redis.max_active", 1000)
- viper.SetDefault("redis.timeout", time.Duration(60000))
- return &Hub{
- LOG: func() *logger.Logger {
- return logger.New()
- },
- }
- }
- func (h *Hub) conn() error {
- //var err error
- maxIdle := viper.GetInt("redis.max_idle")
- maxActive := viper.GetInt("redis.max_active")
- idleTimeout := viper.GetDuration("redis.timeout")
- nodes := viper.GetStringSlice("redis.nodes")
- if nil != nodes && len(nodes) > 0 { // 集群
- //var cluster *redisc.Cluster
- cluster, err := redisc.NewCluster(&redisc.Options{
- StartNodes: nodes,
- ConnTimeout: 5 * time.Second,
- ReadTimeout: 50 * time.Millisecond,
- WriteTimeout: 50 * time.Millisecond,
- KeepAlive: maxActive,
- AliveTime: idleTimeout,
- })
- if nil != err {
- return err
- }
- h.LOG().Info("Redis cluster connected")
- h.cluster = cluster
- } else { // 单点
- var pool *redis.Pool
- host := viper.GetString("redis.host")
- opts := []redis.DialOption{redis.DialConnectTimeout(5 * time.Second)}
- password := viper.GetString("redis.password")
- if len(strings.TrimSpace(password)) > 0 {
- opts = append(opts, redis.DialPassword(strings.TrimSpace(password)))
- }
- pool = &redis.Pool{
- MaxIdle: maxIdle,
- MaxActive: maxActive,
- IdleTimeout: idleTimeout,
- Dial: func() (redis.Conn, error) {
- if conn, err := redis.Dial("tcp", host, opts...); err != nil {
- h.LOG().Warnf("Get Redis connection failed: %s", err.Error())
- return nil, err
- } else {
- return conn, nil
- }
- },
- TestOnBorrow: func(c redis.Conn, t time.Time) error {
- _, err := c.Do("PING")
- return err
- },
- }
- // 连接测试
- conn := pool.Get()
- defer func(conn redis.Conn) {
- _ = conn.Close()
- }(conn)
- if reply, err := redis.String(conn.Do("PING")); err != nil || reply != "PONG" {
- if err != nil {
- h.LOG().Warnf("Can not connect to redis: %s", err.Error())
- } else {
- h.LOG().Warnf("Can not connect to redis: %s", reply)
- }
- return err
- }
- h.LOG().Info("Redis connected")
- h.pool = pool
- }
- go func(h2 *Hub) {
- defer func(h3 *Hub) {
- if err := h3.Close(); nil != err {
- h3.LOG().Error("Close redis connection failed:", err.Error())
- }
- h3.LOG().Info("Redis connection closed")
- }(h2)
- quit := make(chan os.Signal)
- signal.Notify(quit, os.Interrupt)
- <-quit
- }(h)
- return nil
- }
- func (h *Hub) Close() error {
- if nil == h {
- return nil
- }
- var err error
- if nil != h.cluster {
- h.cluster.Close()
- } else if nil != h.pool {
- err = h.pool.Close()
- }
- return err
- }
- func (h *Hub) Do(cmd string, args ...interface{}) (interface{}, error) {
- if nil == h {
- return nil, nil
- }
- var err error
- h.once.Do(func() {
- e := h.conn()
- if nil != e {
- err = e
- }
- })
- if nil != err {
- return nil, err
- }
- if nil != h.cluster {
- return h.cluster.Do(cmd, args...)
- } else if h.pool != nil {
- conn := h.pool.Get()
- defer func(conn redis.Conn) {
- _ = conn.Close()
- }(conn)
- do, err := conn.Do(cmd, args...)
- if err != nil {
- return nil, err
- }
- return do, nil
- } else {
- return nil, errors.New("redis conn nil")
- }
- }
- func (h *Hub) Get() (redis.Conn, error) {
- if h.pool == nil {
- err := h.conn()
- if err != nil {
- return nil, err
- }
- }
- conn := h.pool.Get()
- return conn, nil
- }
|