consumer.go 2.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. package kafka
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "git.haoqitour.com/haoqi/go-common/utils/logger"
  6. "github.com/Shopify/sarama"
  7. "github.com/bsm/sarama-cluster"
  8. "github.com/spf13/viper"
  9. "reflect"
  10. )
  11. // 消费者对象
  12. type Consumer struct {
  13. *cluster.Consumer
  14. log *logger.Logger
  15. }
  16. // 新建消费者
  17. func NewConsumer(groupID string, topics []string) *Consumer {
  18. config := cluster.NewConfig()
  19. config.Consumer.Return.Errors = true
  20. config.Group.Return.Notifications = false
  21. viper.SetDefault("kafka.brokers", []string{"127.0.0.1:9092"})
  22. brokers := viper.GetStringSlice("kafka.brokers")
  23. consumer, err := cluster.NewConsumer(brokers, groupID, topics, config)
  24. if err != nil {
  25. panic(fmt.Sprintf("Failed to start consumer: %s", err))
  26. }
  27. return &Consumer{Consumer: consumer}
  28. }
  29. // 将底层类库的日志输出到指定日志记录器
  30. func (c *Consumer) SetLogger(log *logger.Logger) {
  31. if nil == log {
  32. return
  33. }
  34. c.log = log
  35. sarama.Logger = log
  36. }
  37. // 获取当前日志记录器
  38. func (c *Consumer) GetLogger() *logger.Logger {
  39. if nil == c.log {
  40. c.log = logger.New()
  41. }
  42. return c.log
  43. }
  44. // 消息读取管道,管道消息类型是byte切片
  45. func (c *Consumer) BytesMessages() <-chan []byte {
  46. ch := make(chan []byte, 0)
  47. go func(c *Consumer, ch chan []byte, oc <-chan *sarama.ConsumerMessage) {
  48. for msg := range oc {
  49. ch <- msg.Value
  50. c.MarkOffset(msg, "") // mark message as processed
  51. }
  52. }(c, ch, c.Consumer.Messages())
  53. return ch
  54. }
  55. // 将消息输出绑定到指定管道上,此方法内会进行反序列化,输出的消息类型可以是指定对象类型
  56. func (c *Consumer) BindJSONChan(channel interface{}) {
  57. go func(c *Consumer, channel interface{}) {
  58. chVal := reflect.ValueOf(channel)
  59. if chVal.Kind() != reflect.Chan {
  60. return
  61. }
  62. argType := chVal.Type().Elem()
  63. for {
  64. select {
  65. case msg := <-c.Messages():
  66. var oPtr reflect.Value
  67. if nil != msg && nil != msg.Value && len(msg.Value) > 0 && string(msg.Value) != "" {
  68. if argType.Kind() != reflect.Ptr {
  69. oPtr = reflect.New(argType)
  70. } else {
  71. oPtr = reflect.New(argType.Elem())
  72. }
  73. _ = json.Unmarshal(msg.Value, oPtr.Interface())
  74. if argType.Kind() != reflect.Ptr {
  75. oPtr = reflect.Indirect(oPtr)
  76. }
  77. }
  78. chVal.Send(oPtr)
  79. c.MarkOffset(msg, "") // mark message as processed
  80. }
  81. }
  82. }(c, channel)
  83. }