redis_conn.go 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150
  1. package redis
  2. import (
  3. "git.aionnect.com/aionnect/go-common/utils"
  4. "git.aionnect.com/aionnect/go-common/utils/logger"
  5. "github.com/gomodule/redigo/redis"
  6. "github.com/spf13/viper"
  7. "os"
  8. "os/signal"
  9. "strings"
  10. "sync"
  11. "syscall"
  12. "time"
  13. )
  14. // 自定义错误
  15. const (
  16. ErrRedisConnNil = ErrRedis("redis conn nil")
  17. )
  18. type ErrRedis string
  19. func (err ErrRedis) Error() string {
  20. return string(err)
  21. }
  22. // 抽象的Redis操作适配器接口定义
  23. type IRedisAdapter interface {
  24. Close() error
  25. Do(commandName string, args ...interface{}) (interface{}, error)
  26. Pipeline() IRedisPipeline
  27. }
  28. type IRedisPipeline interface {
  29. Send(commandName string, args ...interface{}) IRedisPipeline
  30. Execute() ([]interface{}, error)
  31. }
  32. type redisCmd struct {
  33. commandName string
  34. args []interface{}
  35. }
  36. // Redis连接池适配器
  37. type Hub struct {
  38. LOG func() *logger.Logger
  39. once sync.Once
  40. adapter IRedisAdapter
  41. }
  42. // 获取Redis连接池适配器的新实例
  43. func NewHub() *Hub {
  44. viper.SetDefault("redis.max_idle", 300)
  45. viper.SetDefault("redis.max_active", 1000)
  46. viper.SetDefault("redis.timeout", 5*time.Second)
  47. return &Hub{
  48. LOG: func() *logger.Logger {
  49. return logger.New()
  50. },
  51. }
  52. }
  53. // 建立与Redis的连接
  54. func (h *Hub) conn() error {
  55. var err error
  56. nodes := viper.GetStringSlice("redis.nodes")
  57. if nil != nodes && len(nodes) > 0 { // 集群
  58. masterName := viper.GetString("redis.master")
  59. if len(strings.TrimSpace(masterName)) > 0 { // redis sentinel集群
  60. h.adapter, err = NewSentinelAdapter(nodes, masterName) // 采用sentinel适配器
  61. } else { // redis cluster集群
  62. //h.adapter, err = NewGoRedisClusterAdapter(nodes) // 采用redis-go-cluster适配器
  63. h.adapter, err = NewRediscAdapter(nodes) // 采用redisc适配器
  64. }
  65. } else { // 单点
  66. host := viper.GetString("redis.host") // 采用redigo适配器
  67. h.adapter, err = NewRedigoAdapter(host)
  68. }
  69. if nil != err {
  70. h.LOG().Errorf("Get Redis connection pool failed: %s", err.Error())
  71. return err
  72. }
  73. return h.ping()
  74. }
  75. // 连接测试
  76. func (h *Hub) ping() error {
  77. if reply, err := redis.String(h.adapter.Do("PING")); err != nil || reply != "PONG" {
  78. if err != nil {
  79. h.LOG().Warnf("Can not connect to redis: %s", err.Error())
  80. } else {
  81. h.LOG().Warnf("Can not connect to redis: %s", reply)
  82. }
  83. return err
  84. }
  85. h.LOG().Info("Redis connected")
  86. return nil
  87. }
  88. // 监听系统退出信号量,自动关闭Redis连接
  89. func (h *Hub) closeWait() {
  90. go func(h *Hub) {
  91. defer utils.DefaultGoroutineRecover(nil, `Redis连接池关闭`)
  92. var c chan os.Signal
  93. var s os.Signal
  94. c = make(chan os.Signal, 1)
  95. signal.Notify(c, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGKILL)
  96. for {
  97. s = <-c
  98. switch s {
  99. case syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGKILL:
  100. if err := h.Close(); nil != err {
  101. h.LOG().Error("Close redis connection failed:", err.Error())
  102. }
  103. h.LOG().Info("Redis connection closed")
  104. return
  105. default:
  106. return
  107. }
  108. }
  109. }(h)
  110. }
  111. // 关闭Redis连接池适配器
  112. func (h *Hub) Close() error {
  113. return h.adapter.Close()
  114. }
  115. // 执行Redis命令
  116. func (h *Hub) Do(commandName string, args ...interface{}) (interface{}, error) {
  117. var err error
  118. h.once.Do(func() {
  119. e := h.conn()
  120. if nil != e {
  121. err = e
  122. } else {
  123. h.closeWait()
  124. }
  125. })
  126. if nil != err {
  127. return nil, err
  128. }
  129. return h.adapter.Do(commandName, args...)
  130. }
  131. // 返回命令管道操作对象
  132. func (h *Hub) Pipeline() IRedisPipeline {
  133. return h.adapter.Pipeline()
  134. }