consumer.go 2.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283
  1. package nats
  2. import (
  3. "git.aionnect.com/aionnect/go-common/utils/logger"
  4. "github.com/nats-io/nats.go"
  5. "github.com/spf13/viper"
  6. "time"
  7. )
  8. // 消费者对象
  9. type Consumer struct {
  10. conn *nats.EncodedConn
  11. queue string
  12. topic string
  13. log *logger.Logger
  14. }
  15. // 新建消费者
  16. func NewConsumer(queue string, topic string) *Consumer {
  17. viper.SetDefault("nats.servers", "nats://127.0.0.1:4222")
  18. servers := viper.GetString("nats.servers")
  19. var consumer *Consumer
  20. nc, err := nats.Connect(servers, nats.MaxReconnects(5), nats.ReconnectWait(2*time.Second),
  21. nats.DisconnectHandler(func(nc *nats.Conn) {
  22. consumer.GetLogger().Info("Got disconnected!\n")
  23. }),
  24. nats.ReconnectHandler(func(nc *nats.Conn) {
  25. consumer.GetLogger().Infof("Got reconnected to %v!\n", nc.ConnectedUrl())
  26. }),
  27. nats.ClosedHandler(func(nc *nats.Conn) {
  28. consumer.GetLogger().Infof("Connection closed. Reason: %q\n", nc.LastError())
  29. }),
  30. )
  31. if nil != err {
  32. panic(err)
  33. }
  34. c, err := nats.NewEncodedConn(nc, nats.JSON_ENCODER)
  35. consumer = &Consumer{conn: c, queue: queue, topic: topic}
  36. return consumer
  37. }
  38. // 指定日志记录器
  39. func (c *Consumer) SetLogger(log *logger.Logger) {
  40. if nil == log {
  41. return
  42. }
  43. c.log = log
  44. }
  45. // 获取当前日志记录器
  46. func (c *Consumer) GetLogger() *logger.Logger {
  47. if nil == c.log {
  48. c.log = logger.New()
  49. }
  50. return c.log
  51. }
  52. // 消息读取管道,管道消息类型是byte切片
  53. func (c *Consumer) BytesMessages() <-chan []byte {
  54. var ch = make(chan []byte, 0)
  55. if c.queue == "" {
  56. _, _ = c.conn.BindRecvChan(c.topic, ch) // 广播
  57. } else {
  58. _, _ = c.conn.BindRecvQueueChan(c.topic, c.queue, ch) // 单播
  59. }
  60. return ch
  61. }
  62. // 将消息输出绑定到指定管道上,此方法内会进行反序列化,输出的消息类型可以是指定对象类型
  63. func (c *Consumer) BindJSONChan(channel interface{}) {
  64. if c.queue == "" {
  65. _, _ = c.conn.BindRecvChan(c.topic, channel) // 广播
  66. } else {
  67. _, _ = c.conn.BindRecvQueueChan(c.topic, c.queue, channel) // 单播
  68. }
  69. }
  70. // 关闭
  71. func (c *Consumer) Close() error {
  72. c.conn.Close()
  73. return nil
  74. }