redis_conn.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134
  1. package redis
  2. import (
  3. "git.aionnect.com/aionnect/go-common/utils"
  4. "git.aionnect.com/aionnect/go-common/utils/logger"
  5. "github.com/gomodule/redigo/redis"
  6. "github.com/spf13/viper"
  7. "os"
  8. "os/signal"
  9. "strings"
  10. "sync"
  11. "syscall"
  12. "time"
  13. )
  14. // 自定义错误
  15. const (
  16. ErrRedisConnNil = ErrRedis("redis conn nil")
  17. )
  18. type ErrRedis string
  19. func (err ErrRedis) Error() string {
  20. return string(err)
  21. }
  22. // 抽象的Redis操作适配器接口定义
  23. type IRedisAdapter interface {
  24. Close() error
  25. Do(commandName string, args ...interface{}) (reply interface{}, err error)
  26. }
  27. // Redis连接池适配器
  28. type Hub struct {
  29. LOG func() *logger.Logger
  30. once sync.Once
  31. adapter IRedisAdapter
  32. }
  33. // 获取Redis连接池适配器的新实例
  34. func NewHub() *Hub {
  35. viper.SetDefault("redis.max_idle", 300)
  36. viper.SetDefault("redis.max_active", 1000)
  37. viper.SetDefault("redis.timeout", time.Duration(60000))
  38. return &Hub{
  39. LOG: func() *logger.Logger {
  40. return logger.New()
  41. },
  42. }
  43. }
  44. // 建立与Redis的连接
  45. func (h *Hub) conn() error {
  46. var err error
  47. nodes := viper.GetStringSlice("redis.nodes")
  48. if nil != nodes && len(nodes) > 0 { // 集群
  49. masterName := viper.GetString("redis.master")
  50. if len(strings.TrimSpace(masterName)) > 0 { // redis sentinel集群
  51. h.adapter, err = NewSentinelAdapter(nodes, masterName) // 采用sentinel适配器
  52. } else { // redis cluster集群
  53. //h.adapter, err = NewGoRedisClusterAdapter(nodes) // 采用redis-go-cluster适配器
  54. h.adapter, err = NewRediscAdapter(nodes) // 采用redisc适配器
  55. }
  56. } else { // 单点
  57. host := viper.GetString("redis.host") // 采用redigo适配器
  58. h.adapter, err = NewRedigoAdapter(host)
  59. }
  60. if nil != err {
  61. h.LOG().Errorf("Get Redis connection pool failed: %s", err.Error())
  62. return err
  63. }
  64. return h.ping()
  65. }
  66. // 连接测试
  67. func (h *Hub) ping() error {
  68. if reply, err := redis.String(h.adapter.Do("PING")); err != nil || reply != "PONG" {
  69. if err != nil {
  70. h.LOG().Warnf("Can not connect to redis: %s", err.Error())
  71. } else {
  72. h.LOG().Warnf("Can not connect to redis: %s", reply)
  73. }
  74. return err
  75. }
  76. h.LOG().Info("Redis connected")
  77. return nil
  78. }
  79. // 监听系统退出信号量,自动关闭Redis连接
  80. func (h *Hub) closeWait() {
  81. go func(h *Hub) {
  82. defer utils.DefaultGoroutineRecover(nil, `Redis连接池关闭`)
  83. var c chan os.Signal
  84. var s os.Signal
  85. c = make(chan os.Signal, 1)
  86. signal.Notify(c, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGKILL)
  87. for {
  88. s = <-c
  89. switch s {
  90. case syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGKILL:
  91. if err := h.Close(); nil != err {
  92. h.LOG().Error("Close redis connection failed:", err.Error())
  93. }
  94. h.LOG().Info("Redis connection closed")
  95. return
  96. default:
  97. return
  98. }
  99. }
  100. }(h)
  101. }
  102. // 关闭Redis连接池适配器
  103. func (h *Hub) Close() error {
  104. return h.adapter.Close()
  105. }
  106. // 执行Redis命令
  107. func (h *Hub) Do(commandName string, args ...interface{}) (interface{}, error) {
  108. var err error
  109. h.once.Do(func() {
  110. e := h.conn()
  111. if nil != e {
  112. err = e
  113. } else {
  114. h.closeWait()
  115. }
  116. })
  117. if nil != err {
  118. return nil, err
  119. }
  120. return h.adapter.Do(commandName, args...)
  121. }