package nats import ( "git.aionnect.com/aionnect/go-common/utils/logger" "git.aionnect.com/aionnect/go-common/utils/mq" "github.com/nats-io/nats.go" "github.com/spf13/viper" "time" ) // 生产者对象 type Producer struct { conn *nats.EncodedConn log *logger.Logger } // 新建生产者 func NewProducer() *Producer { viper.SetDefault("nats.servers", "nats://127.0.0.1:4222") servers := viper.GetString("nats.servers") var producer *Producer nc, err := nats.Connect(servers, nats.MaxReconnects(5), nats.ReconnectWait(2*time.Second), nats.DisconnectHandler(func(nc *nats.Conn) { producer.GetLogger().Info("Got disconnected!\n") }), nats.ReconnectHandler(func(nc *nats.Conn) { producer.GetLogger().Infof("Got reconnected to %v!\n", nc.ConnectedUrl()) }), nats.ClosedHandler(func(nc *nats.Conn) { producer.GetLogger().Infof("Connection closed. Reason: %q\n", nc.LastError()) }), ) if nil != err { panic(err) } c, err := nats.NewEncodedConn(nc, nats.JSON_ENCODER) producer = &Producer{conn: c} return producer } // 指定日志记录器 func (p *Producer) SetLogger(log *logger.Logger) { if nil == log { return } p.log = log } // 获取当前日志记录器 func (p *Producer) GetLogger() *logger.Logger { if nil == p.log { p.log = logger.New() } return p.log } // 发送单条消息 func (p *Producer) SendJSON(topic string, value interface{}) (result interface{}, err error) { if topic == "" { return "", nil } err = p.conn.Publish(topic, value) if nil != err { return "", err } return "success", nil } // 批量发送消息 func (p *Producer) SendJSONs(messages []*mq.ProducerMessage) (err error) { if nil == messages || len(messages) == 0 { return } for i := 0; i < len(messages); i++ { if messages[i].Topic == "" { continue } _ = p.conn.Publish(messages[i].Topic, messages[i].Value) } return } // 关闭 func (p *Producer) Close() error { p.conn.Close() return nil }