package nats import ( "git.aionnect.com/aionnect/go-common/utils/logger" "github.com/nats-io/nats.go" "github.com/spf13/viper" "time" ) // 消费者对象 type Consumer struct { conn *nats.EncodedConn queue string topic string log *logger.Logger } // 新建消费者 func NewConsumer(queue string, topic string) *Consumer { viper.SetDefault("nats.servers", "nats://127.0.0.1:4222") servers := viper.GetString("nats.servers") var consumer *Consumer nc, err := nats.Connect(servers, nats.MaxReconnects(5), nats.ReconnectWait(2*time.Second), nats.DisconnectHandler(func(nc *nats.Conn) { consumer.GetLogger().Info("Got disconnected!\n") }), nats.ReconnectHandler(func(nc *nats.Conn) { consumer.GetLogger().Infof("Got reconnected to %v!\n", nc.ConnectedUrl()) }), nats.ClosedHandler(func(nc *nats.Conn) { consumer.GetLogger().Infof("Connection closed. Reason: %q\n", nc.LastError()) }), ) if nil != err { panic(err) } c, err := nats.NewEncodedConn(nc, nats.JSON_ENCODER) consumer = &Consumer{conn: c, queue: queue, topic: topic} return consumer } // 指定日志记录器 func (c *Consumer) SetLogger(log *logger.Logger) { if nil == log { return } c.log = log } // 获取当前日志记录器 func (c *Consumer) GetLogger() *logger.Logger { if nil == c.log { c.log = logger.New() } return c.log } // 消息读取管道,管道消息类型是byte切片 func (c *Consumer) BytesMessages() <-chan []byte { var ch = make(chan []byte, 0) if c.queue == "" { _, _ = c.conn.BindRecvChan(c.topic, ch) // 广播 } else { _, _ = c.conn.BindRecvQueueChan(c.topic, c.queue, ch) // 单播 } return ch } // 将消息输出绑定到指定管道上,此方法内会进行反序列化,输出的消息类型可以是指定对象类型 func (c *Consumer) BindJSONChan(channel interface{}) { if c.queue == "" { _, _ = c.conn.BindRecvChan(c.topic, channel) // 广播 } else { _, _ = c.conn.BindRecvQueueChan(c.topic, c.queue, channel) // 单播 } } // 关闭 func (c *Consumer) Close() error { c.conn.Close() return nil }