123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150 |
- package redis
- import (
- "git.aionnect.com/aionnect/go-common/utils"
- "git.aionnect.com/aionnect/go-common/utils/logger"
- "github.com/gomodule/redigo/redis"
- "github.com/spf13/viper"
- "os"
- "os/signal"
- "strings"
- "sync"
- "syscall"
- "time"
- )
- // 自定义错误
- const (
- ErrRedisConnNil = ErrRedis("redis conn nil")
- )
- type ErrRedis string
- func (err ErrRedis) Error() string {
- return string(err)
- }
- // 抽象的Redis操作适配器接口定义
- type IRedisAdapter interface {
- Close() error
- Do(commandName string, args ...interface{}) (interface{}, error)
- Pipeline() IRedisPipeline
- }
- type IRedisPipeline interface {
- Send(commandName string, args ...interface{}) IRedisPipeline
- Execute() ([]interface{}, error)
- }
- type redisCmd struct {
- commandName string
- args []interface{}
- }
- // Redis连接池适配器
- type Hub struct {
- LOG func() *logger.Logger
- once sync.Once
- adapter IRedisAdapter
- }
- // 获取Redis连接池适配器的新实例
- func NewHub() *Hub {
- viper.SetDefault("redis.max_idle", 300)
- viper.SetDefault("redis.max_active", 1000)
- viper.SetDefault("redis.timeout", 5*time.Second)
- return &Hub{
- LOG: func() *logger.Logger {
- return logger.New()
- },
- }
- }
- // 建立与Redis的连接
- func (h *Hub) conn() error {
- var err error
- nodes := viper.GetStringSlice("redis.nodes")
- if nil != nodes && len(nodes) > 0 { // 集群
- masterName := viper.GetString("redis.master")
- if len(strings.TrimSpace(masterName)) > 0 { // redis sentinel集群
- h.adapter, err = NewSentinelAdapter(nodes, masterName) // 采用sentinel适配器
- } else { // redis cluster集群
- //h.adapter, err = NewGoRedisClusterAdapter(nodes) // 采用redis-go-cluster适配器
- h.adapter, err = NewRediscAdapter(nodes) // 采用redisc适配器
- }
- } else { // 单点
- host := viper.GetString("redis.host") // 采用redigo适配器
- h.adapter, err = NewRedigoAdapter(host)
- }
- if nil != err {
- h.LOG().Errorf("Get Redis connection pool failed: %s", err.Error())
- return err
- }
- return h.ping()
- }
- // 连接测试
- func (h *Hub) ping() error {
- if reply, err := redis.String(h.adapter.Do("PING")); err != nil || reply != "PONG" {
- if err != nil {
- h.LOG().Warnf("Can not connect to redis: %s", err.Error())
- } else {
- h.LOG().Warnf("Can not connect to redis: %s", reply)
- }
- return err
- }
- h.LOG().Info("Redis connected")
- return nil
- }
- // 监听系统退出信号量,自动关闭Redis连接
- func (h *Hub) closeWait() {
- go func(h *Hub) {
- defer utils.DefaultGoroutineRecover(nil, `Redis连接池关闭`)
- var c chan os.Signal
- var s os.Signal
- c = make(chan os.Signal, 1)
- signal.Notify(c, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGKILL)
- for {
- s = <-c
- switch s {
- case syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGKILL:
- if err := h.Close(); nil != err {
- h.LOG().Error("Close redis connection failed:", err.Error())
- }
- h.LOG().Info("Redis connection closed")
- return
- default:
- return
- }
- }
- }(h)
- }
- // 关闭Redis连接池适配器
- func (h *Hub) Close() error {
- return h.adapter.Close()
- }
- // 执行Redis命令
- func (h *Hub) Do(commandName string, args ...interface{}) (interface{}, error) {
- var err error
- h.once.Do(func() {
- e := h.conn()
- if nil != e {
- err = e
- } else {
- h.closeWait()
- }
- })
- if nil != err {
- return nil, err
- }
- return h.adapter.Do(commandName, args...)
- }
- // 返回命令管道操作对象
- func (h *Hub) Pipeline() IRedisPipeline {
- return h.adapter.Pipeline()
- }
|