123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204 |
- package redis
- import (
- "git.aionnect.com/aionnect/go-common/utils"
- "git.aionnect.com/aionnect/go-common/utils/logger"
- "github.com/gomodule/redigo/redis"
- "github.com/mna/redisc"
- "github.com/spf13/viper"
- "os"
- "os/signal"
- "strings"
- "sync"
- "syscall"
- "time"
- )
- // 自定义错误
- const (
- ErrRedisConnNil = ErrRedis("redis conn nil")
- )
- type ErrRedis string
- func (err ErrRedis) Error() string {
- return string(err)
- }
- // Redis连接池适配器
- type Hub struct {
- LOG func() *logger.Logger
- pool *redis.Pool
- cluster *redisc.Cluster
- once sync.Once
- }
- // 获取Redis连接池适配器的新实例
- func NewHub() *Hub {
- viper.SetDefault("redis.max_idle", 300)
- viper.SetDefault("redis.max_active", 1000)
- viper.SetDefault("redis.timeout", time.Duration(60000))
- return &Hub{
- LOG: func() *logger.Logger {
- return logger.New()
- },
- }
- }
- // 创建连接池
- func (h *Hub) createPool(addr string, opts ...redis.DialOption) (*redis.Pool, error) {
- maxIdle := viper.GetInt("redis.max_idle")
- maxActive := viper.GetInt("redis.max_active")
- idleTimeout := viper.GetDuration("redis.timeout")
- return &redis.Pool{
- MaxIdle: maxIdle,
- MaxActive: maxActive,
- IdleTimeout: idleTimeout,
- Dial: func() (redis.Conn, error) {
- if conn, err := redis.Dial("tcp", addr, opts...); err != nil {
- h.LOG().Warnf("Get Redis connection failed: %s", err.Error())
- return nil, err
- } else {
- return conn, nil
- }
- },
- TestOnBorrow: func(c redis.Conn, t time.Time) error {
- _, err := c.Do("PING")
- return err
- },
- }, nil
- }
- // 建立与Redis的连接
- func (h *Hub) conn() error {
- dialOpts := []redis.DialOption{
- redis.DialConnectTimeout(5 * time.Second),
- //redis.DialReadTimeout(5 * time.Second),
- //redis.DialWriteTimeout(5 * time.Second),
- }
- nodes := viper.GetStringSlice("redis.nodes")
- if nil != nodes && len(nodes) > 0 { // 集群
- cluster := &redisc.Cluster{
- StartupNodes: nodes,
- DialOptions: dialOpts,
- CreatePool: h.createPool,
- }
- h.cluster = cluster
- } else { // 单点
- var pool *redis.Pool
- host := viper.GetString("redis.host")
- password := viper.GetString("redis.password")
- if len(strings.TrimSpace(password)) > 0 {
- dialOpts = append(dialOpts, redis.DialPassword(strings.TrimSpace(password)))
- }
- pool, _ = h.createPool(host, dialOpts...)
- h.pool = pool
- }
- return h.ping()
- }
- // 连接测试
- func (h *Hub) ping() error {
- // 连接测试
- conn, err := h.Get()
- if nil != err {
- return err
- }
- defer func(conn redis.Conn) {
- _ = conn.Close()
- }(conn)
- if reply, err := redis.String(conn.Do("PING")); err != nil || reply != "PONG" {
- if err != nil {
- h.LOG().Warnf("Can not connect to redis: %s", err.Error())
- } else {
- h.LOG().Warnf("Can not connect to redis: %s", reply)
- }
- return err
- }
- h.LOG().Info("Redis connected")
- return nil
- }
- // 监听系统退出信号量,自动关闭Redis连接
- func (h *Hub) closeWait() {
- go func(h *Hub) {
- defer utils.DefaultGoroutineRecover(nil, `Redis连接池关闭`)
- var c chan os.Signal
- var s os.Signal
- c = make(chan os.Signal, 1)
- signal.Notify(c, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGKILL)
- for {
- s = <-c
- switch s {
- case syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGKILL:
- if err := h.Close(); nil != err {
- h.LOG().Error("Close redis connection failed:", err.Error())
- }
- h.LOG().Info("Redis connection closed")
- return
- default:
- return
- }
- }
- }(h)
- }
- // 关闭Redis连接
- func (h *Hub) Close() error {
- var err error
- if nil != h.cluster {
- err = h.cluster.Close()
- } else if nil != h.pool {
- err = h.pool.Close()
- }
- return err
- }
- // 执行Redis命令
- func (h *Hub) Do(cmd string, args ...interface{}) (interface{}, error) {
- conn, err := h.Get()
- if nil != err {
- return nil, err
- }
- defer func(conn redis.Conn) {
- _ = conn.Close()
- }(conn)
- if nil == conn {
- return nil, ErrRedisConnNil
- } else {
- reply, err := conn.Do(cmd, args...)
- if nil != err {
- return nil, err
- }
- return reply, nil
- }
- }
- // 从连接池中获取一个连接
- func (h *Hub) Get() (redis.Conn, error) {
- var err error
- // 延迟一次性初始化Redis连接池
- h.once.Do(func() {
- e := h.conn()
- if nil != e {
- err = e
- } else {
- h.closeWait()
- }
- })
- if nil != err {
- return nil, err
- }
- if nil != h.pool {
- return h.pool.Get(), nil
- } else if nil != h.cluster {
- return h.cluster.Get(), nil
- } else {
- return nil, ErrRedisConnNil
- }
- }
|