1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283 |
- package nats
- import (
- "git.aionnect.com/aionnect/go-common/utils/logger"
- "github.com/nats-io/nats.go"
- "github.com/spf13/viper"
- "time"
- )
- // 消费者对象
- type Consumer struct {
- conn *nats.EncodedConn
- queue string
- topic string
- log *logger.Logger
- }
- // 新建消费者
- func NewConsumer(queue string, topic string) *Consumer {
- viper.SetDefault("nats.servers", "nats://127.0.0.1:4222")
- servers := viper.GetString("nats.servers")
- var consumer *Consumer
- nc, err := nats.Connect(servers, nats.MaxReconnects(5), nats.ReconnectWait(2*time.Second),
- nats.DisconnectHandler(func(nc *nats.Conn) {
- consumer.GetLogger().Info("Got disconnected!\n")
- }),
- nats.ReconnectHandler(func(nc *nats.Conn) {
- consumer.GetLogger().Infof("Got reconnected to %v!\n", nc.ConnectedUrl())
- }),
- nats.ClosedHandler(func(nc *nats.Conn) {
- consumer.GetLogger().Infof("Connection closed. Reason: %q\n", nc.LastError())
- }),
- )
- if nil != err {
- panic(err)
- }
- c, err := nats.NewEncodedConn(nc, nats.JSON_ENCODER)
- consumer = &Consumer{conn: c, queue: queue, topic: topic}
- return consumer
- }
- // 指定日志记录器
- func (c *Consumer) SetLogger(log *logger.Logger) {
- if nil == log {
- return
- }
- c.log = log
- }
- // 获取当前日志记录器
- func (c *Consumer) GetLogger() *logger.Logger {
- if nil == c.log {
- c.log = logger.New()
- }
- return c.log
- }
- // 消息读取管道,管道消息类型是byte切片
- func (c *Consumer) BytesMessages() <-chan []byte {
- var ch = make(chan []byte, 0)
- if c.queue == "" {
- _, _ = c.conn.BindRecvChan(c.topic, ch) // 广播
- } else {
- _, _ = c.conn.BindRecvQueueChan(c.topic, c.queue, ch) // 单播
- }
- return ch
- }
- // 将消息输出绑定到指定管道上,此方法内会进行反序列化,输出的消息类型可以是指定对象类型
- func (c *Consumer) BindJSONChan(channel interface{}) {
- if c.queue == "" {
- _, _ = c.conn.BindRecvChan(c.topic, channel) // 广播
- } else {
- _, _ = c.conn.BindRecvQueueChan(c.topic, c.queue, channel) // 单播
- }
- }
- // 关闭
- func (c *Consumer) Close() error {
- c.conn.Close()
- return nil
- }
|