redis_conn.go 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
  1. package redis
  2. import (
  3. "errors"
  4. "git.aionnect.com/aionnect/go-common/utils/logger"
  5. redisc "github.com/chasex/redis-go-cluster"
  6. "github.com/gomodule/redigo/redis"
  7. "github.com/spf13/viper"
  8. "os"
  9. "os/signal"
  10. "strings"
  11. "sync"
  12. "time"
  13. )
  14. type Hub struct {
  15. LOG func() *logger.Logger
  16. pool *redis.Pool
  17. cluster redisc.Cluster
  18. once sync.Once
  19. }
  20. func NewHub() *Hub {
  21. viper.SetDefault("redis.max_idle", 300)
  22. viper.SetDefault("redis.max_active", 1000)
  23. viper.SetDefault("redis.timeout", time.Duration(60000))
  24. return &Hub{
  25. LOG: func() *logger.Logger {
  26. return logger.New()
  27. },
  28. }
  29. }
  30. func (h *Hub) conn() error {
  31. //var err error
  32. maxIdle := viper.GetInt("redis.max_idle")
  33. maxActive := viper.GetInt("redis.max_active")
  34. idleTimeout := viper.GetDuration("redis.timeout")
  35. nodes := viper.GetStringSlice("redis.nodes")
  36. if nil != nodes && len(nodes) > 0 { // 集群
  37. //var cluster *redisc.Cluster
  38. cluster, err := redisc.NewCluster(&redisc.Options{
  39. StartNodes: nodes,
  40. ConnTimeout: 5 * time.Second,
  41. ReadTimeout: 50 * time.Millisecond,
  42. WriteTimeout: 50 * time.Millisecond,
  43. KeepAlive: maxActive,
  44. AliveTime: idleTimeout,
  45. })
  46. if nil != err {
  47. return err
  48. }
  49. h.LOG().Info("Redis cluster connected")
  50. h.cluster = cluster
  51. } else { // 单点
  52. var pool *redis.Pool
  53. host := viper.GetString("redis.host")
  54. opts := []redis.DialOption{redis.DialConnectTimeout(5 * time.Second)}
  55. password := viper.GetString("redis.password")
  56. if len(strings.TrimSpace(password)) > 0 {
  57. opts = append(opts, redis.DialPassword(strings.TrimSpace(password)))
  58. }
  59. pool = &redis.Pool{
  60. MaxIdle: maxIdle,
  61. MaxActive: maxActive,
  62. IdleTimeout: idleTimeout,
  63. Dial: func() (redis.Conn, error) {
  64. if conn, err := redis.Dial("tcp", host, opts...); err != nil {
  65. h.LOG().Warnf("Get Redis connection failed: %s", err.Error())
  66. return nil, err
  67. } else {
  68. return conn, nil
  69. }
  70. },
  71. TestOnBorrow: func(c redis.Conn, t time.Time) error {
  72. _, err := c.Do("PING")
  73. return err
  74. },
  75. }
  76. // 连接测试
  77. conn := pool.Get()
  78. defer func(conn redis.Conn) {
  79. _ = conn.Close()
  80. }(conn)
  81. if reply, err := redis.String(conn.Do("PING")); err != nil || reply != "PONG" {
  82. if err != nil {
  83. h.LOG().Warnf("Can not connect to redis: %s", err.Error())
  84. } else {
  85. h.LOG().Warnf("Can not connect to redis: %s", reply)
  86. }
  87. return err
  88. }
  89. h.LOG().Info("Redis connected")
  90. h.pool = pool
  91. }
  92. go func(h2 *Hub) {
  93. defer func(h3 *Hub) {
  94. if err := h3.Close(); nil != err {
  95. h3.LOG().Error("Close redis connection failed:", err.Error())
  96. }
  97. h3.LOG().Info("Redis connection closed")
  98. }(h2)
  99. quit := make(chan os.Signal)
  100. signal.Notify(quit, os.Interrupt)
  101. <-quit
  102. }(h)
  103. return nil
  104. }
  105. func (h *Hub) Close() error {
  106. if nil == h {
  107. return nil
  108. }
  109. var err error
  110. if nil != h.pool {
  111. err = h.pool.Close()
  112. }
  113. return err
  114. }
  115. func (h *Hub) Do(cmd string, args ...interface{}) (interface{}, error) {
  116. if nil == h {
  117. return nil, nil
  118. }
  119. var err error
  120. h.once.Do(func() {
  121. e := h.conn()
  122. if nil != e {
  123. err = e
  124. }
  125. })
  126. if nil != err {
  127. return nil, err
  128. }
  129. if nil != h.cluster {
  130. return h.cluster.Do(cmd, args...)
  131. } else if h.pool != nil {
  132. conn := h.pool.Get()
  133. defer func(conn redis.Conn) {
  134. _ = conn.Close()
  135. }(conn)
  136. do, err := conn.Do(cmd, args...)
  137. if err != nil {
  138. return nil, err
  139. }
  140. return do, nil
  141. } else {
  142. return nil, errors.New("redis conn nil")
  143. }
  144. }
  145. func (h *Hub) Get() (redis.Conn, error) {
  146. if h.pool == nil {
  147. err := h.conn()
  148. if err != nil {
  149. return nil, err
  150. }
  151. }
  152. conn := h.pool.Get()
  153. return conn, nil
  154. }