consumer_test.go 3.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. package kafka
  2. import (
  3. "fmt"
  4. "git.aionnect.com/aionnect/go-common/utils"
  5. "git.aionnect.com/aionnect/go-common/utils/mq"
  6. "git.aionnect.com/aionnect/go-common/utils/mq/topic"
  7. "sync"
  8. "testing"
  9. )
  10. var wg sync.WaitGroup
  11. func TestNewConsumer(t *testing.T) {
  12. // 以下示例代码是模拟中断延时处理定时器需求实现,逻辑较复杂,如果只关心kafka使用本身,仅关注加了序号注释的部位即可
  13. // 1. 新建单播消费者
  14. // 初始化增加定时任务消息消费者,相同group名单播消费消息
  15. // kafka的单播分配到哪一个消费者与topic的partition配置密切相关,当partition数小于消费者数量时,会有部分消费者始终无法获得消息
  16. // 可以用 ./kafka-topics.sh --zookeeper 127.0.0.1:2181 --describe --topic topic-name 命令查看topic信息
  17. // 可以用 ./kafka-topics.sh --alter --topic topic-name --zookeeper 127.0.0.1:2181 --partitions 4 命令调整partition数量
  18. // 调整完partition数量,需要重启或重置消费者,以重新分配消费者与partition的对应关系
  19. // 另外注意本示例中用的topic.Order方法是有给topic加上一个前缀的
  20. plusTopics := []string{topic.TOP("ka-alloc-plus-job")}
  21. plusConsumer := NewConsumer("ka-alloc-plus-job-group", plusTopics)
  22. defer func(plusConsumer *Consumer) {
  23. _ = plusConsumer.Close()
  24. }(plusConsumer)
  25. // 2. 如果需要可以处理异常
  26. go func() {
  27. for err := range plusConsumer.Errors() {
  28. t.Errorf("Error: %s\n", err.Error())
  29. }
  30. }()
  31. // 1. 新建广播消费者
  32. // 初始化减少定时任务消息消费者,不同group名广播消费消息
  33. reduceTopics := []string{topic.TOP("ka-alloc-reduce-job")}
  34. // 正式编码推荐用 utils.GetPrivateIPv4Id() 代替 utils.NextId() 来获取机器号
  35. // utils.GetPrivateIPv4Id() 这个函数会根据当前机器内网ip的末尾两段运算出一个id,即测试和生产环境不同Pod的ip不同这个id也会不同
  36. machineId := utils.NextId()
  37. reduceConsumer := NewConsumer(fmt.Sprintf("ka-alloc-reduce-job-group-%d", machineId), reduceTopics)
  38. defer func(reduceConsumer *Consumer) {
  39. _ = reduceConsumer.Close()
  40. }(reduceConsumer)
  41. // 2. 如果需要可以处理异常
  42. go func() {
  43. for err := range reduceConsumer.Errors() {
  44. t.Errorf("Error: %s\n", err.Error())
  45. }
  46. }()
  47. // 消费消息
  48. //wg.Add(1)
  49. //go func() {
  50. // defer wg.Done()
  51. // for msg := range plusConsumer.Messages() { // 3. 在协程中循环阻塞取消息管道中的消息
  52. // fmt.Printf( "%s/%d/%d\t%s\t%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value) // msg.Value如果是Json可以反序列化
  53. // plusConsumer.MarkOffset(msg, "") // mark message as processed
  54. // }
  55. //}()
  56. //wg.Add(1)
  57. //go func() {
  58. // defer wg.Done()
  59. // for msg := range reduceConsumer.Messages() { // 3. 在协程中循环阻塞取消息管道中的消息
  60. // fmt.Printf( "%s/%d/%d\t%s\t%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value) // msg.Value如果是Json可以反序列化
  61. // reduceConsumer.MarkOffset(msg, "") // mark message as processed
  62. // }
  63. //}()
  64. wg.Add(1)
  65. go func() {
  66. defer wg.Done()
  67. var plusCh = make(chan *mq.TestMsg, 0)
  68. plusConsumer.BindJSONChan(plusCh)
  69. for { // 3. 在协程中循环阻塞取消息管道中的消息
  70. select {
  71. case msg := <-plusCh:
  72. fmt.Printf("%v\n", msg) // msg已经是反序列化得到的对象
  73. }
  74. }
  75. }()
  76. wg.Add(1)
  77. go func() {
  78. defer wg.Done()
  79. var reduceCh = make(chan *mq.TestMsg, 0)
  80. reduceConsumer.BindJSONChan(reduceCh)
  81. for { // 3. 在协程中循环阻塞取消息管道中的消息
  82. select {
  83. case msg := <-reduceCh:
  84. fmt.Printf("%v\n", msg) // msg已经是反序列化得到的对象
  85. }
  86. }
  87. }()
  88. wg.Wait()
  89. t.Log("Done consuming topic")
  90. }