redis_conn.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195
  1. package redis
  2. import (
  3. "git.aionnect.com/aionnect/go-common/utils"
  4. "git.aionnect.com/aionnect/go-common/utils/logger"
  5. "github.com/gomodule/redigo/redis"
  6. "github.com/mna/redisc"
  7. "github.com/spf13/viper"
  8. "os"
  9. "os/signal"
  10. "strings"
  11. "sync"
  12. "syscall"
  13. "time"
  14. )
  15. type Hub struct {
  16. LOG func() *logger.Logger
  17. pool *redis.Pool
  18. cluster *redisc.Cluster
  19. once sync.Once
  20. }
  21. func NewHub() *Hub {
  22. viper.SetDefault("redis.max_idle", 300)
  23. viper.SetDefault("redis.max_active", 1000)
  24. viper.SetDefault("redis.timeout", time.Duration(60000))
  25. return &Hub{
  26. LOG: func() *logger.Logger {
  27. return logger.New()
  28. },
  29. }
  30. }
  31. func (h *Hub) createPool(addr string, opts ...redis.DialOption) (*redis.Pool, error) {
  32. maxIdle := viper.GetInt("redis.max_idle")
  33. maxActive := viper.GetInt("redis.max_active")
  34. idleTimeout := viper.GetDuration("redis.timeout")
  35. return &redis.Pool{
  36. MaxIdle: maxIdle,
  37. MaxActive: maxActive,
  38. IdleTimeout: idleTimeout,
  39. Dial: func() (redis.Conn, error) {
  40. if conn, err := redis.Dial("tcp", addr, opts...); err != nil {
  41. h.LOG().Warnf("Get Redis connection failed: %s", err.Error())
  42. return nil, err
  43. } else {
  44. return conn, nil
  45. }
  46. },
  47. TestOnBorrow: func(c redis.Conn, t time.Time) error {
  48. _, err := c.Do("PING")
  49. return err
  50. },
  51. }, nil
  52. }
  53. func (h *Hub) conn() error {
  54. dialOpts := []redis.DialOption{
  55. redis.DialConnectTimeout(5 * time.Second),
  56. redis.DialReadTimeout(50 * time.Millisecond),
  57. redis.DialWriteTimeout(50 * time.Millisecond),
  58. }
  59. nodes := viper.GetStringSlice("redis.nodes")
  60. if nil != nodes && len(nodes) > 0 { // 集群
  61. cluster := &redisc.Cluster{
  62. StartupNodes: nodes,
  63. DialOptions: dialOpts,
  64. CreatePool: h.createPool,
  65. }
  66. h.cluster = cluster
  67. } else { // 单点
  68. var pool *redis.Pool
  69. host := viper.GetString("redis.host")
  70. opts := []redis.DialOption{redis.DialConnectTimeout(5 * time.Second)}
  71. password := viper.GetString("redis.password")
  72. if len(strings.TrimSpace(password)) > 0 {
  73. opts = append(opts, redis.DialPassword(strings.TrimSpace(password)))
  74. }
  75. pool, _ = h.createPool(host, dialOpts...)
  76. h.pool = pool
  77. }
  78. return nil
  79. }
  80. func (h *Hub) test() error {
  81. // 连接测试
  82. conn, err := h.Get()
  83. if nil != err {
  84. return err
  85. }
  86. defer func(conn redis.Conn) {
  87. _ = conn.Close()
  88. }(conn)
  89. if reply, err := redis.String(conn.Do("PING")); err != nil || reply != "PONG" {
  90. if err != nil {
  91. h.LOG().Warnf("Can not connect to redis: %s", err.Error())
  92. } else {
  93. h.LOG().Warnf("Can not connect to redis: %s", reply)
  94. }
  95. return err
  96. }
  97. h.LOG().Info("Redis connected")
  98. return nil
  99. }
  100. func (h *Hub) closeWait() {
  101. go func(h *Hub) {
  102. defer utils.DefaultGoroutineRecover(nil, `Redis连接池关闭`)
  103. var c chan os.Signal
  104. var s os.Signal
  105. c = make(chan os.Signal, 1)
  106. signal.Notify(c, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGKILL)
  107. for {
  108. s = <-c
  109. switch s {
  110. case syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGKILL:
  111. if err := h.Close(); nil != err {
  112. h.LOG().Error("Close redis connection failed:", err.Error())
  113. }
  114. h.LOG().Info("Redis connection closed")
  115. return
  116. default:
  117. return
  118. }
  119. }
  120. }(h)
  121. }
  122. func (h *Hub) Close() error {
  123. var err error
  124. if nil != h.cluster {
  125. err = h.cluster.Close()
  126. } else if nil != h.pool {
  127. err = h.pool.Close()
  128. }
  129. return err
  130. }
  131. func (h *Hub) Do(cmd string, args ...interface{}) (interface{}, error) {
  132. conn, err := h.Get()
  133. if nil != err {
  134. return nil, err
  135. }
  136. defer func(conn redis.Conn) {
  137. _ = conn.Close()
  138. }(conn)
  139. if nil != conn {
  140. return nil, ErrRedisConnNil
  141. } else {
  142. reply, err := conn.Do(cmd, args...)
  143. if nil != err {
  144. return nil, err
  145. }
  146. return reply, nil
  147. }
  148. }
  149. const (
  150. ErrRedisConnNil = ErrRedis("redis conn nil")
  151. )
  152. type ErrRedis string
  153. func (err ErrRedis) Error() string {
  154. return string(err)
  155. }
  156. func (h *Hub) Get() (redis.Conn, error) {
  157. var err error
  158. h.once.Do(func() {
  159. e := h.conn()
  160. if nil != e {
  161. err = e
  162. } else {
  163. h.closeWait()
  164. }
  165. })
  166. if nil != err {
  167. return nil, err
  168. }
  169. if nil != h.pool {
  170. return h.pool.Get(), nil
  171. } else if nil != h.cluster {
  172. return h.cluster.Get(), nil
  173. } else {
  174. return nil, ErrRedisConnNil
  175. }
  176. }