consumer.go 2.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. package kafka
  2. import (
  3. "fmt"
  4. "git.aionnect.com/aionnect/go-common/utils"
  5. "git.aionnect.com/aionnect/go-common/utils/jsonutil"
  6. "git.aionnect.com/aionnect/go-common/utils/logger"
  7. "github.com/Shopify/sarama"
  8. "github.com/bsm/sarama-cluster"
  9. "github.com/spf13/viper"
  10. "reflect"
  11. )
  12. // 消费者对象
  13. type Consumer struct {
  14. *cluster.Consumer
  15. log *logger.Logger
  16. }
  17. // 新建消费者
  18. func NewConsumer(groupID string, topics []string) *Consumer {
  19. config := cluster.NewConfig()
  20. config.Consumer.Return.Errors = true
  21. config.Group.Return.Notifications = false
  22. viper.SetDefault("kafka.brokers", []string{"127.0.0.1:9092"})
  23. brokers := viper.GetStringSlice("kafka.brokers")
  24. consumer, err := cluster.NewConsumer(brokers, groupID, topics, config)
  25. if err != nil {
  26. panic(fmt.Sprintf("Failed to start consumer: %s", err))
  27. }
  28. return &Consumer{Consumer: consumer}
  29. }
  30. // 将底层类库的日志输出到指定日志记录器
  31. func (c *Consumer) SetLogger(log *logger.Logger) {
  32. if nil == log {
  33. return
  34. }
  35. c.log = log
  36. sarama.Logger = log
  37. }
  38. // 获取当前日志记录器
  39. func (c *Consumer) GetLogger() *logger.Logger {
  40. if nil == c.log {
  41. c.log = logger.New()
  42. }
  43. return c.log
  44. }
  45. // 消息读取管道,管道消息类型是byte切片
  46. func (c *Consumer) BytesMessages() <-chan []byte {
  47. ch := make(chan []byte, 0)
  48. go func(c *Consumer, ch chan []byte, oc <-chan *sarama.ConsumerMessage) {
  49. defer utils.DefaultGoroutineRecover(c.log, `KAFKA消息读取管道`)
  50. for msg := range oc {
  51. ch <- msg.Value
  52. c.MarkOffset(msg, "") // mark message as processed
  53. }
  54. }(c, ch, c.Consumer.Messages())
  55. return ch
  56. }
  57. // 将消息输出绑定到指定管道上,此方法内会进行反序列化,输出的消息类型可以是指定对象类型
  58. func (c *Consumer) BindJSONChan(channel interface{}) {
  59. go func(c *Consumer, channel interface{}) {
  60. defer utils.DefaultGoroutineRecover(c.log, `KAFKA消息输出绑定`)
  61. chVal := reflect.ValueOf(channel)
  62. if chVal.Kind() != reflect.Chan {
  63. return
  64. }
  65. argType := chVal.Type().Elem()
  66. for {
  67. select {
  68. case msg := <-c.Messages():
  69. var oPtr reflect.Value
  70. if nil != msg && nil != msg.Value && len(msg.Value) > 0 && string(msg.Value) != "" {
  71. if argType.Kind() != reflect.Ptr {
  72. oPtr = reflect.New(argType)
  73. } else {
  74. oPtr = reflect.New(argType.Elem())
  75. }
  76. _ = jsonutil.Unmarshal(msg.Value, oPtr.Interface())
  77. if argType.Kind() != reflect.Ptr {
  78. oPtr = reflect.Indirect(oPtr)
  79. }
  80. }
  81. chVal.Send(oPtr)
  82. c.MarkOffset(msg, "") // mark message as processed
  83. }
  84. }
  85. }(c, channel)
  86. }