redis_conn.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. package redis
  2. import (
  3. "git.aionnect.com/aionnect/go-common/utils"
  4. "git.aionnect.com/aionnect/go-common/utils/logger"
  5. "github.com/gomodule/redigo/redis"
  6. "github.com/spf13/viper"
  7. "os"
  8. "os/signal"
  9. "sync"
  10. "syscall"
  11. "time"
  12. )
  13. // 自定义错误
  14. const (
  15. ErrRedisConnNil = ErrRedis("redis conn nil")
  16. )
  17. type ErrRedis string
  18. func (err ErrRedis) Error() string {
  19. return string(err)
  20. }
  21. // 抽象的Redis操作适配器接口定义
  22. type IRedisAdapter interface {
  23. Close() error
  24. Do(commandName string, args ...interface{}) (reply interface{}, err error)
  25. }
  26. // Redis连接池适配器
  27. type Hub struct {
  28. LOG func() *logger.Logger
  29. once sync.Once
  30. adapter IRedisAdapter
  31. }
  32. // 获取Redis连接池适配器的新实例
  33. func NewHub() *Hub {
  34. viper.SetDefault("redis.max_idle", 300)
  35. viper.SetDefault("redis.max_active", 1000)
  36. viper.SetDefault("redis.timeout", time.Duration(60000))
  37. return &Hub{
  38. LOG: func() *logger.Logger {
  39. return logger.New()
  40. },
  41. }
  42. }
  43. // 建立与Redis的连接
  44. func (h *Hub) conn() error {
  45. var err error
  46. nodes := viper.GetStringSlice("redis.nodes")
  47. if nil != nodes && len(nodes) > 0 { // 集群
  48. h.adapter, err = NewGoRedisClusterAdapter(nodes)
  49. //h.adapter, err = NewRediscAdapter(nodes)
  50. } else { // 单点
  51. host := viper.GetString("redis.host")
  52. h.adapter, err = NewRedigoAdapter(host)
  53. }
  54. if nil != err {
  55. h.LOG().Errorf("Get Redis connection pool failed: %s", err.Error())
  56. return err
  57. }
  58. return h.ping()
  59. }
  60. // 连接测试
  61. func (h *Hub) ping() error {
  62. if reply, err := redis.String(h.adapter.Do("PING")); err != nil || reply != "PONG" {
  63. if err != nil {
  64. h.LOG().Warnf("Can not connect to redis: %s", err.Error())
  65. } else {
  66. h.LOG().Warnf("Can not connect to redis: %s", reply)
  67. }
  68. return err
  69. }
  70. h.LOG().Info("Redis connected")
  71. return nil
  72. }
  73. // 监听系统退出信号量,自动关闭Redis连接
  74. func (h *Hub) closeWait() {
  75. go func(h *Hub) {
  76. defer utils.DefaultGoroutineRecover(nil, `Redis连接池关闭`)
  77. var c chan os.Signal
  78. var s os.Signal
  79. c = make(chan os.Signal, 1)
  80. signal.Notify(c, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGKILL)
  81. for {
  82. s = <-c
  83. switch s {
  84. case syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGKILL:
  85. if err := h.Close(); nil != err {
  86. h.LOG().Error("Close redis connection failed:", err.Error())
  87. }
  88. h.LOG().Info("Redis connection closed")
  89. return
  90. default:
  91. return
  92. }
  93. }
  94. }(h)
  95. }
  96. // 关闭Redis连接池适配器
  97. func (h *Hub) Close() error {
  98. return h.adapter.Close()
  99. }
  100. // 执行Redis命令
  101. func (h *Hub) Do(commandName string, args ...interface{}) (interface{}, error) {
  102. var err error
  103. h.once.Do(func() {
  104. e := h.conn()
  105. if nil != e {
  106. err = e
  107. } else {
  108. h.closeWait()
  109. }
  110. })
  111. if nil != err {
  112. return nil, err
  113. }
  114. return h.adapter.Do(commandName, args...)
  115. }