package redis

import (
	"git.aionnect.com/aionnect/go-common/utils"
	"git.aionnect.com/aionnect/go-common/utils/logger"
	"os"
	"os/signal"
	"strings"
	"syscall"
	"time"
)

// Redis队列实现
// 注意! Redis Cluster不支持同时操作多个健的命令,例如BRPOPLPUSH",故本类与Redis Cluster不兼容
type Queue struct {
	MainListName   string         // 主列表名称
	BackupListName string         // 备份列表名称
	BlockedTimeout int            // 阻塞超时
	RecycleTimeout time.Duration  // 回收超时
	IsIdempotent   bool           // 是否幂等
	recycleTicker  *time.Ticker   // 回收定时器
	isRing         bool           // 是否环形队列
	hub            *Hub           // Redis连接适配器对象
	l              *logger.Logger // 日志记录器实例
}

// 构造Redis队列新实例
func NewRedisQueue(mainListName, backupListName string, blockedTimeout int, recycleTimeout time.Duration) *Queue {
	mainListName = strings.TrimSpace(mainListName)
	backupListName = strings.TrimSpace(backupListName)
	if mainListName == "" {
		mainListName = "defaultList"
	}
	if backupListName == "" {
		backupListName = mainListName
	}
	if blockedTimeout <= 0 {
		blockedTimeout = 500
	}
	if recycleTimeout <= 0 {
		recycleTimeout = 5 * time.Second
	}
	q := &Queue{
		hub:            NewHub(),
		MainListName:   mainListName,
		BackupListName: backupListName,
		BlockedTimeout: blockedTimeout,
		RecycleTimeout: recycleTimeout,
	}
	q.l = q.hub.LOG()
	q.isRing = q.MainListName == q.BackupListName

	// 自动关闭处理
	q.closeWait()
	return q
}

// Clean() 清理Redis列表,通常情况应该用不上
func (q *Queue) Clean() {
	_, err := q.hub.Do("DEL", q.MainListName)
	if nil != err {
		q.l.Errorf("Clean() delete %s failed: %s", q.MainListName, err.Error())
	}
	if !q.isRing {
		_, err = q.hub.Do("DEL", q.BackupListName)
		if nil != err {
			q.l.Errorf("Clean() delete %s failed: %s", q.BackupListName, err.Error())
		}
	}
}

// Pop() 从Redis列表中读取数据
// 为避免消费者崩溃或意外错误丢数据,被读取(主列表队尾)的数据同时插入(备份列表队头)备份列表,当成功处理完成时,再从备份列表移除
// 备份列表和主列表可以是同一个,即构造一个环形队列,但注意在环形队列时,消费异常时的阻塞行为是有差异的,且数据积压少并有多个消费者时会触发重复处理
func (q *Queue) Pop(fn func(interface{}) error) {
	for {
		reply, err := q.hub.Do("BRPOPLPUSH", q.MainListName, q.BackupListName, q.BlockedTimeout)
		if nil != err {
			q.l.Errorf("Pop() pop from %s and push to %s failed: %s", q.MainListName, q.BackupListName, err.Error())
			if err == ErrRedisConnNil { // Redis未连接时无阻塞,等待几秒再重试,以免死循环
				time.Sleep(5 * time.Second)
			}
			continue
		}
		if nil != fn {
			err = fn(reply)
			if nil != err { // 传人的逻辑处理函数返回错误时,不继续执行队列数据清理
				continue
			}
		}
		// 环形队列时,为避免忽略掉新增的的相同内容更新事件,仅移除最新的一条与已处理值相等的内容
		// 当无新增的相同内容时,即刚刚RPOPLPUSH从队尾"备份"到队头的
		// 当有新增的相同内容时,虽然最新增加的一条会被移除,但队列中还是会有更早的"备份"记录
		pipeline := q.hub.Pipeline()
		if q.isRing {
			if q.IsIdempotent { // 当消息体是幂等时,即反复操作得到的结果一致时,当成功操作一次后,也即可清除主队列中相同元素,下同
				pipeline.Send("LREM", q.BackupListName, 0, reply) // isRing时BackupListName = MainListName
			} else {
				pipeline.Send("LREM", q.BackupListName, 1, reply)
			}
		} else { // 非环形队列时,移除备份列表中所有与已处理值相等的内容
			pipeline.Send("LREM", q.BackupListName, 0, reply)
			if q.IsIdempotent {
				pipeline.Send("LREM", q.MainListName, 0, reply)
			}
		}
		_, err = pipeline.Execute()

		if nil != err {
			q.l.Errorf("Pop() remove from %s failed: %s", q.BackupListName, err.Error())
		}
	}
}

// Push() 添加数据到Redis列表
func (q *Queue) Push(obj interface{}) {
	_, err := q.hub.Do("LPUSH", q.MainListName, obj) // 主列表队头插入
	if nil != err {
		q.l.Errorf("Push() push to %s failed: %s", q.MainListName, err.Error())
	}
}

// recycle() 定时将备份队列中最早的一条内容,回收到主队列重试处理
// 没有写到构造函数里,是为了可以灵活分离和分配生产者、消费者、回收者
func (q *Queue) Recycle() {
	if q.isRing { // 环形队列不需要做回收处理
		return
	}

	go func(q *Queue) {
		defer utils.DefaultGoroutineRecover(nil, `Redis队列回收`)

		q.recycleTicker = time.NewTicker(q.RecycleTimeout)
		for {
			select {
			case <-q.recycleTicker.C:
				_, err := q.hub.Do("RPOPLPUSH", q.BackupListName, q.MainListName)
				if nil != err {
					q.l.Errorf("recycle() pop from %s and push to %s failed: %s", q.BackupListName, q.MainListName, err.Error())
				}
			}
		}
	}(q)
}

// 收到系统退出消息时自动关闭Redis队列相关资源
func (q *Queue) closeWait() {
	go func(q *Queue) {
		defer utils.DefaultGoroutineRecover(nil, `Redis队列关闭`)
		var c chan os.Signal
		var s os.Signal
		c = make(chan os.Signal, 1)
		signal.Notify(c, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGKILL)
		for {
			s = <-c
			switch s {
			case syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGKILL:
				if !q.isRing && nil != q.recycleTicker {
					q.recycleTicker.Stop()
				}
				err := q.hub.Close()
				if nil != err {
					q.l.Errorf("closeWait() close redis queue connection failed %s", err.Error())
				}
				return
			default:
				return
			}
		}
	}(q)
}