123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113 |
- package main
- import (
- "crypto/tls"
- "crypto/x509"
- "fmt"
- "github.com/Shopify/sarama"
- "github.com/bsm/sarama-cluster"
- "io/ioutil"
- "os"
- "os/signal"
- )
- var consumer *cluster.Consumer
- var sig chan os.Signal
- func init() {
- topics := []string{"chatRoom"}
- brokers := []string{"47.94.239.212:9093", "47.94.91.146:9093", "47.94.154.247:9093"}
- certPath := "./alicloud_kafka/ca-cert"
- fmt.Println("init kafka consumer, it may take a few seconds...")
- var err error
- clusterCfg := cluster.NewConfig()
- clusterCfg.Net.SASL.Enable = true
- clusterCfg.Net.SASL.User = "wangyangming"
- clusterCfg.Net.SASL.Password = "WangMy1920"
- clusterCfg.Net.SASL.Handshake = true
- certBytes, err := ioutil.ReadFile(certPath)
- clientCertPool := x509.NewCertPool()
- ok := clientCertPool.AppendCertsFromPEM(certBytes)
- if !ok {
- panic("kafka consumer failed to parse root certificate")
- }
- clusterCfg.Net.TLS.Config = &tls.Config{
- //Certificates: []tls.Certificate{},
- RootCAs: clientCertPool,
- InsecureSkipVerify: true,
- }
- clusterCfg.Net.TLS.Enable = true
- clusterCfg.Consumer.Return.Errors = true
- clusterCfg.Consumer.Offsets.Initial = sarama.OffsetOldest
- clusterCfg.Group.Return.Notifications = true
- clusterCfg.Version = sarama.V0_10_2_1
- if err = clusterCfg.Validate(); err != nil {
- msg := fmt.Sprintf("Kafka consumer config invalidate. config: %v. err: %v", *clusterCfg, err)
- fmt.Println(msg)
- panic(msg)
- }
- // consumer group需要事先建立好,参考:https://help.aliyun.com/document_detail/99952.html?spm=a2c4g.11186623.6.564.2b74754bS6iQbg#title-sf4-w77-d85
- consumer, err = cluster.NewConsumer(brokers, "consumer_group_test", topics, clusterCfg)
- if err != nil {
- msg := fmt.Sprintf("Create kafka consumer error: %v. config: %v", err, clusterCfg)
- fmt.Println(msg)
- panic(msg)
- }
- sig = make(chan os.Signal, 1)
- }
- func Start() {
- go consume()
- }
- func consume() {
- for {
- select {
- case msg, more := <-consumer.Messages():
- if more {
- fmt.Printf("Partition:%d, Offset:%d, Key:%s, Value:%s, Timestamp:%s\n", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value), msg.Timestamp)
- consumer.MarkOffset(msg, "") // mark message as processed
- }
- case err, more := <-consumer.Errors():
- if more {
- fmt.Printf("kafka consumer error: %v\n", err.Error())
- }
- case ntf, more := <-consumer.Notifications():
- if more {
- fmt.Printf("Kafka consumer rebalance: %v\n", ntf)
- }
- case <-sig:
- fmt.Println("Stop consumer server...")
- _ = consumer.Close()
- return
- }
- }
- }
- func Stop(s os.Signal) {
- fmt.Println("Recived kafka consumer stop signal...")
- sig <- s
- fmt.Println("kafka consumer stopped!!!")
- }
- func main() {
- signals := make(chan os.Signal, 1)
- signal.Notify(signals, os.Interrupt)
- Start()
- select {
- case s := <-signals:
- Stop(s)
- }
- }
|