redis_conn.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204
  1. package redis
  2. import (
  3. "git.aionnect.com/aionnect/go-common/utils"
  4. "git.aionnect.com/aionnect/go-common/utils/logger"
  5. "github.com/gomodule/redigo/redis"
  6. "github.com/mna/redisc"
  7. "github.com/spf13/viper"
  8. "os"
  9. "os/signal"
  10. "strings"
  11. "sync"
  12. "syscall"
  13. "time"
  14. )
  15. // 自定义错误
  16. const (
  17. ErrRedisConnNil = ErrRedis("redis conn nil")
  18. )
  19. type ErrRedis string
  20. func (err ErrRedis) Error() string {
  21. return string(err)
  22. }
  23. // Redis连接池适配器
  24. type Hub struct {
  25. LOG func() *logger.Logger
  26. pool *redis.Pool
  27. cluster *redisc.Cluster
  28. once sync.Once
  29. }
  30. // 获取Redis连接池适配器的新实例
  31. func NewHub() *Hub {
  32. viper.SetDefault("redis.max_idle", 300)
  33. viper.SetDefault("redis.max_active", 1000)
  34. viper.SetDefault("redis.timeout", time.Duration(60000))
  35. return &Hub{
  36. LOG: func() *logger.Logger {
  37. return logger.New()
  38. },
  39. }
  40. }
  41. // 创建连接池
  42. func (h *Hub) createPool(addr string, opts ...redis.DialOption) (*redis.Pool, error) {
  43. maxIdle := viper.GetInt("redis.max_idle")
  44. maxActive := viper.GetInt("redis.max_active")
  45. idleTimeout := viper.GetDuration("redis.timeout")
  46. return &redis.Pool{
  47. MaxIdle: maxIdle,
  48. MaxActive: maxActive,
  49. IdleTimeout: idleTimeout,
  50. Dial: func() (redis.Conn, error) {
  51. if conn, err := redis.Dial("tcp", addr, opts...); err != nil {
  52. h.LOG().Warnf("Get Redis connection failed: %s", err.Error())
  53. return nil, err
  54. } else {
  55. return conn, nil
  56. }
  57. },
  58. TestOnBorrow: func(c redis.Conn, t time.Time) error {
  59. _, err := c.Do("PING")
  60. return err
  61. },
  62. }, nil
  63. }
  64. // 建立与Redis的连接
  65. func (h *Hub) conn() error {
  66. dialOpts := []redis.DialOption{
  67. redis.DialConnectTimeout(5 * time.Second),
  68. //redis.DialReadTimeout(5 * time.Second),
  69. //redis.DialWriteTimeout(5 * time.Second),
  70. }
  71. nodes := viper.GetStringSlice("redis.nodes")
  72. if nil != nodes && len(nodes) > 0 { // 集群
  73. cluster := &redisc.Cluster{
  74. StartupNodes: nodes,
  75. DialOptions: dialOpts,
  76. CreatePool: h.createPool,
  77. }
  78. h.cluster = cluster
  79. } else { // 单点
  80. var pool *redis.Pool
  81. host := viper.GetString("redis.host")
  82. password := viper.GetString("redis.password")
  83. if len(strings.TrimSpace(password)) > 0 {
  84. dialOpts = append(dialOpts, redis.DialPassword(strings.TrimSpace(password)))
  85. }
  86. pool, _ = h.createPool(host, dialOpts...)
  87. h.pool = pool
  88. }
  89. return h.ping()
  90. }
  91. // 连接测试
  92. func (h *Hub) ping() error {
  93. // 连接测试
  94. conn, err := h.Get()
  95. if nil != err {
  96. return err
  97. }
  98. defer func(conn redis.Conn) {
  99. _ = conn.Close()
  100. }(conn)
  101. if reply, err := redis.String(conn.Do("PING")); err != nil || reply != "PONG" {
  102. if err != nil {
  103. h.LOG().Warnf("Can not connect to redis: %s", err.Error())
  104. } else {
  105. h.LOG().Warnf("Can not connect to redis: %s", reply)
  106. }
  107. return err
  108. }
  109. h.LOG().Info("Redis connected")
  110. return nil
  111. }
  112. // 监听系统退出信号量,自动关闭Redis连接
  113. func (h *Hub) closeWait() {
  114. go func(h *Hub) {
  115. defer utils.DefaultGoroutineRecover(nil, `Redis连接池关闭`)
  116. var c chan os.Signal
  117. var s os.Signal
  118. c = make(chan os.Signal, 1)
  119. signal.Notify(c, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGKILL)
  120. for {
  121. s = <-c
  122. switch s {
  123. case syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGKILL:
  124. if err := h.Close(); nil != err {
  125. h.LOG().Error("Close redis connection failed:", err.Error())
  126. }
  127. h.LOG().Info("Redis connection closed")
  128. return
  129. default:
  130. return
  131. }
  132. }
  133. }(h)
  134. }
  135. // 关闭Redis连接
  136. func (h *Hub) Close() error {
  137. var err error
  138. if nil != h.cluster {
  139. err = h.cluster.Close()
  140. } else if nil != h.pool {
  141. err = h.pool.Close()
  142. }
  143. return err
  144. }
  145. // 执行Redis命令
  146. func (h *Hub) Do(cmd string, args ...interface{}) (interface{}, error) {
  147. conn, err := h.Get()
  148. if nil != err {
  149. return nil, err
  150. }
  151. defer func(conn redis.Conn) {
  152. _ = conn.Close()
  153. }(conn)
  154. if nil == conn {
  155. return nil, ErrRedisConnNil
  156. } else {
  157. reply, err := conn.Do(cmd, args...)
  158. if nil != err {
  159. return nil, err
  160. }
  161. return reply, nil
  162. }
  163. }
  164. // 从连接池中获取一个连接
  165. func (h *Hub) Get() (redis.Conn, error) {
  166. var err error
  167. // 延迟一次性初始化Redis连接池
  168. h.once.Do(func() {
  169. e := h.conn()
  170. if nil != e {
  171. err = e
  172. } else {
  173. h.closeWait()
  174. }
  175. })
  176. if nil != err {
  177. return nil, err
  178. }
  179. if nil != h.pool {
  180. return h.pool.Get(), nil
  181. } else if nil != h.cluster {
  182. return h.cluster.Get(), nil
  183. } else {
  184. return nil, ErrRedisConnNil
  185. }
  186. }