redigo_adapter.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126
  1. package redis
  2. import (
  3. "fmt"
  4. "github.com/gomodule/redigo/redis"
  5. "github.com/spf13/viper"
  6. "strings"
  7. "time"
  8. )
  9. // redigo适配器
  10. type RedigoAdapter struct {
  11. pool *redis.Pool
  12. }
  13. // 返回redigo适配器新实例
  14. func NewRedigoAdapter(addr string) (IRedisAdapter, error) {
  15. opts := []redis.DialOption{redis.DialConnectTimeout(5 * time.Second)}
  16. password := viper.GetString("redis.password")
  17. if len(strings.TrimSpace(password)) > 0 {
  18. opts = append(opts, redis.DialPassword(strings.TrimSpace(password)))
  19. }
  20. pool, err := createPool(addr, opts...)
  21. if nil != err {
  22. return nil, err
  23. }
  24. return &RedigoAdapter{
  25. pool: pool,
  26. }, nil
  27. }
  28. // 创建redis连接池
  29. func createPool(addr string, opts ...redis.DialOption) (*redis.Pool, error) {
  30. maxIdle := viper.GetInt("redis.max_idle")
  31. maxActive := viper.GetInt("redis.max_active")
  32. idleTimeout := viper.GetDuration("redis.timeout")
  33. return &redis.Pool{
  34. MaxIdle: maxIdle,
  35. MaxActive: maxActive,
  36. IdleTimeout: idleTimeout,
  37. Dial: func() (redis.Conn, error) {
  38. if conn, err := redis.Dial("tcp", addr, opts...); nil != err {
  39. return nil, err
  40. } else {
  41. return conn, nil
  42. }
  43. },
  44. TestOnBorrow: func(c redis.Conn, t time.Time) error {
  45. _, err := c.Do("PING")
  46. return err
  47. },
  48. }, nil
  49. }
  50. // 关闭Redis连接
  51. func (a *RedigoAdapter) Close() error {
  52. return a.pool.Close()
  53. }
  54. // 执行Redis命令
  55. func (a *RedigoAdapter) Do(commandName string, args ...interface{}) (interface{}, error) {
  56. conn := a.pool.Get()
  57. defer func(conn redis.Conn) {
  58. _ = conn.Close()
  59. }(conn)
  60. if nil == conn {
  61. return nil, ErrRedisConnNil
  62. } else {
  63. return conn.Do(commandName, args...)
  64. }
  65. }
  66. // 返回命令管道操作对象
  67. func (a *RedigoAdapter) Pipeline() IRedisPipeline {
  68. return &RedigoPipeline{
  69. conn: a.pool.Get(),
  70. }
  71. }
  72. // Redigo命令管道操作对象
  73. type RedigoPipeline struct {
  74. conn redis.Conn
  75. commands []*redisCmd
  76. }
  77. // 向管道中添加命令
  78. func (p *RedigoPipeline) Send(commandName string, args ...interface{}) IRedisPipeline {
  79. p.commands = append(p.commands, &redisCmd{commandName: commandName, args: args})
  80. return p
  81. }
  82. // 执行管道中的命令
  83. func (p *RedigoPipeline) Execute() ([]interface{}, error) {
  84. defer func(conn redis.Conn) {
  85. _ = conn.Close()
  86. }(p.conn)
  87. if nil == p.commands || len(p.commands) == 0 {
  88. return nil, fmt.Errorf("no commands in pipeline")
  89. }
  90. var err error
  91. for i := 0; i < len(p.commands); i++ {
  92. cmd := p.commands[i]
  93. err = p.conn.Send(cmd.commandName, cmd.args...)
  94. if nil != err {
  95. return nil, err
  96. }
  97. }
  98. err = p.conn.Flush()
  99. if nil != err {
  100. return nil, err
  101. }
  102. var replies []interface{}
  103. for i := 0; i < len(p.commands); i++ {
  104. reply, err := p.conn.Receive()
  105. if nil != err {
  106. return nil, err
  107. }
  108. replies = append(replies, reply)
  109. }
  110. return replies, nil
  111. }