package nats

import (
	"git.aionnect.com/aionnect/go-common/utils/logger"
	"github.com/nats-io/nats.go"
	"github.com/spf13/viper"
	"time"
)

// 消费者对象
type Consumer struct {
	conn  *nats.EncodedConn
	queue string
	topic string
	log   *logger.Logger
}

// 新建消费者
func NewConsumer(queue string, topic string) *Consumer {
	viper.SetDefault("nats.servers", "nats://127.0.0.1:4222")
	servers := viper.GetString("nats.servers")

	var consumer *Consumer
	nc, err := nats.Connect(servers, nats.MaxReconnects(5), nats.ReconnectWait(2*time.Second),
		nats.DisconnectHandler(func(nc *nats.Conn) {
			consumer.GetLogger().Info("Got disconnected!\n")
		}),
		nats.ReconnectHandler(func(nc *nats.Conn) {
			consumer.GetLogger().Infof("Got reconnected to %v!\n", nc.ConnectedUrl())
		}),
		nats.ClosedHandler(func(nc *nats.Conn) {
			consumer.GetLogger().Infof("Connection closed. Reason: %q\n", nc.LastError())
		}),
	)
	if nil != err {
		panic(err)
	}
	c, err := nats.NewEncodedConn(nc, nats.JSON_ENCODER)
	consumer = &Consumer{conn: c, queue: queue, topic: topic}
	return consumer
}

// 指定日志记录器
func (c *Consumer) SetLogger(log *logger.Logger) {
	if nil == log {
		return
	}
	c.log = log
}

// 获取当前日志记录器
func (c *Consumer) GetLogger() *logger.Logger {
	if nil == c.log {
		c.log = logger.New()
	}
	return c.log
}

// 消息读取管道,管道消息类型是byte切片
func (c *Consumer) BytesMessages() <-chan []byte {
	var ch = make(chan []byte, 0)
	if c.queue == "" {
		_, _ = c.conn.BindRecvChan(c.topic, ch) // 广播
	} else {
		_, _ = c.conn.BindRecvQueueChan(c.topic, c.queue, ch) // 单播
	}
	return ch
}

// 将消息输出绑定到指定管道上,此方法内会进行反序列化,输出的消息类型可以是指定对象类型
func (c *Consumer) BindJSONChan(channel interface{}) {
	if c.queue == "" {
		_, _ = c.conn.BindRecvChan(c.topic, channel) // 广播
	} else {
		_, _ = c.conn.BindRecvQueueChan(c.topic, c.queue, channel) // 单播
	}
}

// 关闭
func (c *Consumer) Close() error {
	c.conn.Close()
	return nil
}