123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990 |
- package nats
- import (
- "git.aionnect.com/aionnect/go-common/utils/logger"
- "git.aionnect.com/aionnect/go-common/utils/mq"
- "github.com/nats-io/nats.go"
- "github.com/spf13/viper"
- "time"
- )
- // 生产者对象
- type Producer struct {
- conn *nats.EncodedConn
- log *logger.Logger
- }
- // 新建生产者
- func NewProducer() *Producer {
- viper.SetDefault("nats.servers", "nats://127.0.0.1:4222")
- servers := viper.GetString("nats.servers")
- var producer *Producer
- nc, err := nats.Connect(servers, nats.MaxReconnects(5), nats.ReconnectWait(2*time.Second),
- nats.DisconnectHandler(func(nc *nats.Conn) {
- producer.GetLogger().Info("Got disconnected!\n")
- }),
- nats.ReconnectHandler(func(nc *nats.Conn) {
- producer.GetLogger().Infof("Got reconnected to %v!\n", nc.ConnectedUrl())
- }),
- nats.ClosedHandler(func(nc *nats.Conn) {
- producer.GetLogger().Infof("Connection closed. Reason: %q\n", nc.LastError())
- }),
- )
- if nil != err {
- panic(err)
- }
- c, err := nats.NewEncodedConn(nc, nats.JSON_ENCODER)
- producer = &Producer{conn: c}
- return producer
- }
- // 指定日志记录器
- func (p *Producer) SetLogger(log *logger.Logger) {
- if nil == log {
- return
- }
- p.log = log
- }
- // 获取当前日志记录器
- func (p *Producer) GetLogger() *logger.Logger {
- if nil == p.log {
- p.log = logger.New()
- }
- return p.log
- }
- // 发送单条消息
- func (p *Producer) SendJSON(topic string, value interface{}) (result interface{}, err error) {
- if topic == "" {
- return "", nil
- }
- err = p.conn.Publish(topic, value)
- if nil != err {
- return "", err
- }
- return "success", nil
- }
- // 批量发送消息
- func (p *Producer) SendJSONs(messages []*mq.ProducerMessage) (err error) {
- if nil == messages || len(messages) == 0 {
- return
- }
- for i := 0; i < len(messages); i++ {
- if messages[i].Topic == "" {
- continue
- }
- _ = p.conn.Publish(messages[i].Topic, messages[i].Value)
- }
- return
- }
- // 关闭
- func (p *Producer) Close() error {
- p.conn.Close()
- return nil
- }
|