ksub.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143
  1. package main
  2. import (
  3. "crypto/tls"
  4. "crypto/x509"
  5. "git.wanpinghui.com/WPH/go_common/wph/logger"
  6. "github.com/Shopify/sarama"
  7. "github.com/bsm/sarama-cluster"
  8. "io/ioutil"
  9. "sync"
  10. )
  11. var (
  12. wg sync.WaitGroup
  13. subLogger = logger.New()
  14. )
  15. func main() {
  16. //brokers := []string{"127.0.0.1:9092"}
  17. brokers := []string{"47.94.239.212:9093", "47.94.91.146:9093", "47.94.154.247:9093"}
  18. sarama.Logger = subLogger
  19. config := cluster.NewConfig()
  20. config.Consumer.Offsets.Initial = sarama.OffsetOldest
  21. config.Consumer.Return.Errors = true
  22. config.Group.Return.Notifications = false
  23. config.Net.SASL.Enable = true
  24. config.Net.SASL.User = "wangyangming"
  25. config.Net.SASL.Password = "WangMy1920"
  26. config.Net.SASL.Handshake = true
  27. config.Version = sarama.V0_10_2_1
  28. config.Net.TLS.Enable = true
  29. certPath := "./alicloud_kafka/ca-cert"
  30. certBytes, err := ioutil.ReadFile(certPath)
  31. clientCertPool := x509.NewCertPool()
  32. ok := clientCertPool.AppendCertsFromPEM(certBytes)
  33. if !ok {
  34. panic("kafka consumer failed to parse root certificate")
  35. }
  36. config.Net.TLS.Config = &tls.Config{
  37. //Certificates: []tls.Certificate{},
  38. RootCAs: clientCertPool,
  39. InsecureSkipVerify: true,
  40. }
  41. // 初始化增加定时任务消息消费者,相同group名单播消费消息
  42. plusTopics := []string{"chatRoom"}
  43. plusConsumer, err := cluster.NewConsumer(brokers, "chat-room-group", plusTopics, config)
  44. if err != nil {
  45. panic(err)
  46. }
  47. defer func() {
  48. _ = plusConsumer.Close()
  49. }()
  50. go func() {
  51. for err := range plusConsumer.Errors() {
  52. subLogger.Error("Error: %s\n", err.Error())
  53. }
  54. }()
  55. //// 初始化减少定时任务消息消费者,不同group名广播消费消息
  56. //reduceTopics := []string{"ka-alloc-reduce-job"}
  57. //// wph.GetPrivateIPv4Id() 这个函数会根据当前机器内网ip的末尾两段运算出一个id,即测试和生产环境不同Pod的ip不同这个id也会不同
  58. //// 正式编码请用 wph.GetPrivateIPv4Id() 代替 wph.NextId()
  59. //machineId := wph.NextId()
  60. //reduceConsumer, err := cluster.NewConsumer(brokers, fmt.Sprintf("ka-alloc-reduce-job-group-%d", machineId), reduceTopics, config)
  61. //if err != nil {
  62. // panic(err)
  63. //}
  64. //defer func() {
  65. // _ = reduceConsumer.Close()
  66. //}()
  67. //go func() {
  68. // for err := range reduceConsumer.Errors() {
  69. // subLogger.Error("Error: %s\n", err.Error())
  70. // }
  71. //}()
  72. // 消费消息
  73. wg.Add(1)
  74. go func() {
  75. defer wg.Done()
  76. for msg := range plusConsumer.Messages() {
  77. subLogger.Infof("%s/%d/%d\t%s\t%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value)
  78. plusConsumer.MarkOffset(msg, "") // mark message as processed
  79. }
  80. }()
  81. //wg.Add(1)
  82. //go func() {
  83. // defer wg.Done()
  84. // for msg := range reduceConsumer.Messages() {
  85. // subLogger.Infof( "%s/%d/%d\t%s\t%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value)
  86. // reduceConsumer.MarkOffset(msg, "") // mark message as processed
  87. // }
  88. //}()
  89. //for {
  90. // select {
  91. // case msg, ok := <-consumer.Messages():
  92. // if ok {
  93. // fmt.Fprintf(os.Stdout, "%s/%d/%d\t%s\t%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value)
  94. // consumer.MarkOffset(msg, "") // mark message as processed
  95. // }
  96. // case <-signals:
  97. // return
  98. // }
  99. //}
  100. //consumer, err := sarama.NewConsumer([]string {"127.0.0.1:9092"}, nil)
  101. //if err != nil {
  102. // subLogger.Println("Failed to start consumer: %s", err)
  103. //}
  104. //defer consumer.Close()
  105. //
  106. //partitionList, err := consumer.Partitions("shuaige")
  107. //if err != nil {
  108. // subLogger.Println("Failed to get the list of partitions: ", err)
  109. //}
  110. //
  111. //for partition := range partitionList {
  112. // pc, err := consumer.ConsumePartition("shuaige", int32(partition), sarama.OffsetNewest)
  113. // if err != nil {
  114. // subLogger.Printf("Failed to start consumer for partition %d: %s\n", partition, err)
  115. // }
  116. // defer pc.AsyncClose()
  117. //
  118. // wg.Add(1)
  119. //
  120. // go func(sarama.PartitionConsumer) {
  121. // defer wg.Done()
  122. // for msg := range pc.Messages() {
  123. // fmt.Printf("Partition:%d, Offset:%d, Key:%s, Value:%s", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
  124. // fmt.Println()
  125. // }
  126. // }(pc)
  127. //}
  128. wg.Wait()
  129. subLogger.Println("Done consuming topic")
  130. }