redis_conn.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181
  1. package redis
  2. import (
  3. "git.aionnect.com/aionnect/go-common/utils/logger"
  4. redisc "github.com/chasex/redis-go-cluster"
  5. "github.com/gomodule/redigo/redis"
  6. "github.com/spf13/viper"
  7. "os"
  8. "os/signal"
  9. "strings"
  10. "sync"
  11. "time"
  12. )
  13. type Hub struct {
  14. LOG func() *logger.Logger
  15. pool *redis.Pool
  16. cluster *redisc.Cluster
  17. once sync.Once
  18. }
  19. func NewHub() *Hub {
  20. viper.SetDefault("redis.max_idle", 300)
  21. viper.SetDefault("redis.max_active", 1000)
  22. viper.SetDefault("redis.timeout", time.Duration(60000))
  23. return &Hub{
  24. LOG: func() *logger.Logger {
  25. return logger.New()
  26. },
  27. }
  28. }
  29. func (h *Hub) conn() error {
  30. //var err error
  31. maxIdle := viper.GetInt("redis.max_idle")
  32. maxActive := viper.GetInt("redis.max_active")
  33. idleTimeout := viper.GetDuration("redis.timeout")
  34. nodes := viper.GetStringSlice("redis.nodes")
  35. if nil != nodes && len(nodes) > 0 { // 集群
  36. //var cluster *redisc.Cluster
  37. cluster, err := redisc.NewCluster(&redisc.Options{
  38. StartNodes: nodes,
  39. ConnTimeout: 5 * time.Second,
  40. ReadTimeout: 50 * time.Millisecond,
  41. WriteTimeout: 50 * time.Millisecond,
  42. KeepAlive: maxActive,
  43. AliveTime: idleTimeout,
  44. })
  45. if nil != err {
  46. return err
  47. }
  48. h.LOG().Info("Redis cluster connected")
  49. h.cluster = cluster
  50. } else { // 单点
  51. var pool *redis.Pool
  52. host := viper.GetString("redis.host")
  53. opts := []redis.DialOption{redis.DialConnectTimeout(5 * time.Second)}
  54. password := viper.GetString("redis.password")
  55. if len(strings.TrimSpace(password)) > 0 {
  56. opts = append(opts, redis.DialPassword(strings.TrimSpace(password)))
  57. }
  58. pool = &redis.Pool{
  59. MaxIdle: maxIdle,
  60. MaxActive: maxActive,
  61. IdleTimeout: idleTimeout,
  62. Dial: func() (redis.Conn, error) {
  63. if conn, err := redis.Dial("tcp", host, opts...); err != nil {
  64. h.LOG().Warnf("Get Redis connection failed: %s", err.Error())
  65. return nil, err
  66. } else {
  67. return conn, nil
  68. }
  69. },
  70. TestOnBorrow: func(c redis.Conn, t time.Time) error {
  71. _, err := c.Do("PING")
  72. return err
  73. },
  74. }
  75. // 连接测试
  76. conn := pool.Get()
  77. defer func(conn redis.Conn) {
  78. _ = conn.Close()
  79. }(conn)
  80. if reply, err := redis.String(conn.Do("PING")); err != nil || reply != "PONG" {
  81. if err != nil {
  82. h.LOG().Warnf("Can not connect to redis: %s", err.Error())
  83. } else {
  84. h.LOG().Warnf("Can not connect to redis: %s", reply)
  85. }
  86. return err
  87. }
  88. h.LOG().Info("Redis connected")
  89. h.pool = pool
  90. }
  91. go func(h2 *Hub) {
  92. defer func(h3 *Hub) {
  93. if err := h3.Close(); nil != err {
  94. h3.LOG().Error("Close redis connection failed:", err.Error())
  95. }
  96. h3.LOG().Info("Redis connection closed")
  97. }(h2)
  98. quit := make(chan os.Signal)
  99. signal.Notify(quit, os.Interrupt)
  100. <-quit
  101. }(h)
  102. return nil
  103. }
  104. func (h *Hub) Close() error {
  105. if nil == h {
  106. return nil
  107. }
  108. var err error
  109. if nil != h.cluster {
  110. h.cluster.Close()
  111. } else if nil != h.pool {
  112. err = h.pool.Close()
  113. }
  114. return err
  115. }
  116. func (h *Hub) Do(cmd string, args ...interface{}) (interface{}, error) {
  117. if nil == h {
  118. return nil, nil
  119. }
  120. var err error
  121. h.once.Do(func() {
  122. e := h.conn()
  123. if nil != e {
  124. err = e
  125. }
  126. })
  127. if nil != err {
  128. return nil, err
  129. }
  130. if nil != h.cluster {
  131. return h.cluster.Do(cmd, args...)
  132. } else if nil != h.pool {
  133. conn := h.pool.Get()
  134. defer func(conn redis.Conn) {
  135. _ = conn.Close()
  136. }(conn)
  137. do, err := conn.Do(cmd, args...)
  138. if nil != err {
  139. return nil, err
  140. }
  141. return do, nil
  142. } else {
  143. return nil, ErrRedisConnNil
  144. }
  145. }
  146. const (
  147. ErrRedisConnNil = ErrRedis("redis conn nil")
  148. )
  149. type ErrRedis string
  150. func (err ErrRedis) Error() string {
  151. return string(err)
  152. }
  153. func (h *Hub) Get() (redis.Conn, error) {
  154. if nil == h.pool {
  155. err := h.conn()
  156. if nil != err {
  157. return nil, err
  158. }
  159. }
  160. conn := h.pool.Get()
  161. return conn, nil
  162. }