consumer.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
  1. package main
  2. import (
  3. "crypto/tls"
  4. "crypto/x509"
  5. "fmt"
  6. "github.com/Shopify/sarama"
  7. "github.com/bsm/sarama-cluster"
  8. "io/ioutil"
  9. "os"
  10. "os/signal"
  11. )
  12. var consumer *cluster.Consumer
  13. var sig chan os.Signal
  14. func init() {
  15. topics := []string{"chatRoom"}
  16. brokers := []string{"47.94.239.212:9093", "47.94.91.146:9093", "47.94.154.247:9093"}
  17. certPath := "./alicloud_kafka/ca-cert"
  18. fmt.Println("init kafka consumer, it may take a few seconds...")
  19. var err error
  20. clusterCfg := cluster.NewConfig()
  21. clusterCfg.Net.SASL.Enable = true
  22. clusterCfg.Net.SASL.User = "wangyangming"
  23. clusterCfg.Net.SASL.Password = "WangMy1920"
  24. clusterCfg.Net.SASL.Handshake = true
  25. certBytes, err := ioutil.ReadFile(certPath)
  26. clientCertPool := x509.NewCertPool()
  27. ok := clientCertPool.AppendCertsFromPEM(certBytes)
  28. if !ok {
  29. panic("kafka consumer failed to parse root certificate")
  30. }
  31. clusterCfg.Net.TLS.Config = &tls.Config{
  32. //Certificates: []tls.Certificate{},
  33. RootCAs: clientCertPool,
  34. InsecureSkipVerify: true,
  35. }
  36. clusterCfg.Net.TLS.Enable = true
  37. clusterCfg.Consumer.Return.Errors = true
  38. clusterCfg.Consumer.Offsets.Initial = sarama.OffsetOldest
  39. clusterCfg.Group.Return.Notifications = true
  40. clusterCfg.Version = sarama.V0_10_2_1
  41. if err = clusterCfg.Validate(); err != nil {
  42. msg := fmt.Sprintf("Kafka consumer config invalidate. config: %v. err: %v", *clusterCfg, err)
  43. fmt.Println(msg)
  44. panic(msg)
  45. }
  46. // consumer group需要事先建立好,参考:https://help.aliyun.com/document_detail/99952.html?spm=a2c4g.11186623.6.564.2b74754bS6iQbg#title-sf4-w77-d85
  47. consumer, err = cluster.NewConsumer(brokers, "consumer_group_test", topics, clusterCfg)
  48. if err != nil {
  49. msg := fmt.Sprintf("Create kafka consumer error: %v. config: %v", err, clusterCfg)
  50. fmt.Println(msg)
  51. panic(msg)
  52. }
  53. sig = make(chan os.Signal, 1)
  54. }
  55. func Start() {
  56. go consume()
  57. }
  58. func consume() {
  59. for {
  60. select {
  61. case msg, more := <-consumer.Messages():
  62. if more {
  63. fmt.Printf("Partition:%d, Offset:%d, Key:%s, Value:%s, Timestamp:%s\n", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value), msg.Timestamp)
  64. consumer.MarkOffset(msg, "") // mark message as processed
  65. }
  66. case err, more := <-consumer.Errors():
  67. if more {
  68. fmt.Printf("kafka consumer error: %v\n", err.Error())
  69. }
  70. case ntf, more := <-consumer.Notifications():
  71. if more {
  72. fmt.Printf("Kafka consumer rebalance: %v\n", ntf)
  73. }
  74. case <-sig:
  75. fmt.Println("Stop consumer server...")
  76. _ = consumer.Close()
  77. return
  78. }
  79. }
  80. }
  81. func Stop(s os.Signal) {
  82. fmt.Println("Recived kafka consumer stop signal...")
  83. sig <- s
  84. fmt.Println("kafka consumer stopped!!!")
  85. }
  86. func main() {
  87. signals := make(chan os.Signal, 1)
  88. signal.Notify(signals, os.Interrupt)
  89. Start()
  90. select {
  91. case s := <-signals:
  92. Stop(s)
  93. }
  94. }