package main import ( "crypto/tls" "crypto/x509" "fmt" "github.com/Shopify/sarama" "github.com/bsm/sarama-cluster" "io/ioutil" "os" "os/signal" ) var consumer *cluster.Consumer var sig chan os.Signal func init() { topics := []string{"chatRoom"} brokers := []string{"47.94.239.212:9093", "47.94.91.146:9093", "47.94.154.247:9093"} certPath := "./alicloud_kafka/ca-cert" fmt.Println("init kafka consumer, it may take a few seconds...") var err error clusterCfg := cluster.NewConfig() clusterCfg.Net.SASL.Enable = true clusterCfg.Net.SASL.User = "wangyangming" clusterCfg.Net.SASL.Password = "WangMy1920" clusterCfg.Net.SASL.Handshake = true certBytes, err := ioutil.ReadFile(certPath) clientCertPool := x509.NewCertPool() ok := clientCertPool.AppendCertsFromPEM(certBytes) if !ok { panic("kafka consumer failed to parse root certificate") } clusterCfg.Net.TLS.Config = &tls.Config{ //Certificates: []tls.Certificate{}, RootCAs: clientCertPool, InsecureSkipVerify: true, } clusterCfg.Net.TLS.Enable = true clusterCfg.Consumer.Return.Errors = true clusterCfg.Consumer.Offsets.Initial = sarama.OffsetOldest clusterCfg.Group.Return.Notifications = true clusterCfg.Version = sarama.V0_10_2_1 if err = clusterCfg.Validate(); err != nil { msg := fmt.Sprintf("Kafka consumer config invalidate. config: %v. err: %v", *clusterCfg, err) fmt.Println(msg) panic(msg) } // consumer group需要事先建立好,参考:https://help.aliyun.com/document_detail/99952.html?spm=a2c4g.11186623.6.564.2b74754bS6iQbg#title-sf4-w77-d85 consumer, err = cluster.NewConsumer(brokers, "consumer_group_test", topics, clusterCfg) if err != nil { msg := fmt.Sprintf("Create kafka consumer error: %v. config: %v", err, clusterCfg) fmt.Println(msg) panic(msg) } sig = make(chan os.Signal, 1) } func Start() { go consume() } func consume() { for { select { case msg, more := <-consumer.Messages(): if more { fmt.Printf("Partition:%d, Offset:%d, Key:%s, Value:%s, Timestamp:%s\n", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value), msg.Timestamp) consumer.MarkOffset(msg, "") // mark message as processed } case err, more := <-consumer.Errors(): if more { fmt.Printf("kafka consumer error: %v\n", err.Error()) } case ntf, more := <-consumer.Notifications(): if more { fmt.Printf("Kafka consumer rebalance: %v\n", ntf) } case <-sig: fmt.Println("Stop consumer server...") _ = consumer.Close() return } } } func Stop(s os.Signal) { fmt.Println("Recived kafka consumer stop signal...") sig <- s fmt.Println("kafka consumer stopped!!!") } func main() { signals := make(chan os.Signal, 1) signal.Notify(signals, os.Interrupt) Start() select { case s := <-signals: Stop(s) } }