producer.go 1.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990
  1. package nats
  2. import (
  3. "git.aionnect.com/aionnect/go-common/utils/logger"
  4. "git.aionnect.com/aionnect/go-common/utils/mq"
  5. "github.com/nats-io/nats.go"
  6. "github.com/spf13/viper"
  7. "time"
  8. )
  9. // 生产者对象
  10. type Producer struct {
  11. conn *nats.EncodedConn
  12. log *logger.Logger
  13. }
  14. // 新建生产者
  15. func NewProducer() *Producer {
  16. viper.SetDefault("nats.servers", "nats://127.0.0.1:4222")
  17. servers := viper.GetString("nats.servers")
  18. var producer *Producer
  19. nc, err := nats.Connect(servers, nats.MaxReconnects(5), nats.ReconnectWait(2*time.Second),
  20. nats.DisconnectHandler(func(nc *nats.Conn) {
  21. producer.GetLogger().Info("Got disconnected!\n")
  22. }),
  23. nats.ReconnectHandler(func(nc *nats.Conn) {
  24. producer.GetLogger().Infof("Got reconnected to %v!\n", nc.ConnectedUrl())
  25. }),
  26. nats.ClosedHandler(func(nc *nats.Conn) {
  27. producer.GetLogger().Infof("Connection closed. Reason: %q\n", nc.LastError())
  28. }),
  29. )
  30. if nil != err {
  31. panic(err)
  32. }
  33. c, err := nats.NewEncodedConn(nc, nats.JSON_ENCODER)
  34. producer = &Producer{conn: c}
  35. return producer
  36. }
  37. // 指定日志记录器
  38. func (p *Producer) SetLogger(log *logger.Logger) {
  39. if nil == log {
  40. return
  41. }
  42. p.log = log
  43. }
  44. // 获取当前日志记录器
  45. func (p *Producer) GetLogger() *logger.Logger {
  46. if nil == p.log {
  47. p.log = logger.New()
  48. }
  49. return p.log
  50. }
  51. // 发送单条消息
  52. func (p *Producer) SendJSON(topic string, value interface{}) (result interface{}, err error) {
  53. if topic == "" {
  54. return "", nil
  55. }
  56. err = p.conn.Publish(topic, value)
  57. if nil != err {
  58. return "", err
  59. }
  60. return "success", nil
  61. }
  62. // 批量发送消息
  63. func (p *Producer) SendJSONs(messages []*mq.ProducerMessage) (err error) {
  64. if nil == messages || len(messages) == 0 {
  65. return
  66. }
  67. for i := 0; i < len(messages); i++ {
  68. if messages[i].Topic == "" {
  69. continue
  70. }
  71. _ = p.conn.Publish(messages[i].Topic, messages[i].Value)
  72. }
  73. return
  74. }
  75. // 关闭
  76. func (p *Producer) Close() error {
  77. p.conn.Close()
  78. return nil
  79. }