123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143 |
- package main
- import (
- "crypto/tls"
- "crypto/x509"
- "git.wanpinghui.com/WPH/go_common/wph/logger"
- "github.com/Shopify/sarama"
- "github.com/bsm/sarama-cluster"
- "io/ioutil"
- "sync"
- )
- var (
- wg sync.WaitGroup
- subLogger = logger.New()
- )
- func main() {
- //brokers := []string{"127.0.0.1:9092"}
- brokers := []string{"47.94.239.212:9093", "47.94.91.146:9093", "47.94.154.247:9093"}
- sarama.Logger = subLogger
- config := cluster.NewConfig()
- config.Consumer.Offsets.Initial = sarama.OffsetOldest
- config.Consumer.Return.Errors = true
- config.Group.Return.Notifications = false
- config.Net.SASL.Enable = true
- config.Net.SASL.User = "wangyangming"
- config.Net.SASL.Password = "WangMy1920"
- config.Net.SASL.Handshake = true
- config.Version = sarama.V0_10_2_1
- config.Net.TLS.Enable = true
- certPath := "./alicloud_kafka/ca-cert"
- certBytes, err := ioutil.ReadFile(certPath)
- clientCertPool := x509.NewCertPool()
- ok := clientCertPool.AppendCertsFromPEM(certBytes)
- if !ok {
- panic("kafka consumer failed to parse root certificate")
- }
- config.Net.TLS.Config = &tls.Config{
- //Certificates: []tls.Certificate{},
- RootCAs: clientCertPool,
- InsecureSkipVerify: true,
- }
- // 初始化增加定时任务消息消费者,相同group名单播消费消息
- plusTopics := []string{"chatRoom"}
- plusConsumer, err := cluster.NewConsumer(brokers, "chat-room-group", plusTopics, config)
- if err != nil {
- panic(err)
- }
- defer func() {
- _ = plusConsumer.Close()
- }()
- go func() {
- for err := range plusConsumer.Errors() {
- subLogger.Error("Error: %s\n", err.Error())
- }
- }()
- //// 初始化减少定时任务消息消费者,不同group名广播消费消息
- //reduceTopics := []string{"ka-alloc-reduce-job"}
- //// wph.GetPrivateIPv4Id() 这个函数会根据当前机器内网ip的末尾两段运算出一个id,即测试和生产环境不同Pod的ip不同这个id也会不同
- //// 正式编码请用 wph.GetPrivateIPv4Id() 代替 wph.NextId()
- //machineId := wph.NextId()
- //reduceConsumer, err := cluster.NewConsumer(brokers, fmt.Sprintf("ka-alloc-reduce-job-group-%d", machineId), reduceTopics, config)
- //if err != nil {
- // panic(err)
- //}
- //defer func() {
- // _ = reduceConsumer.Close()
- //}()
- //go func() {
- // for err := range reduceConsumer.Errors() {
- // subLogger.Error("Error: %s\n", err.Error())
- // }
- //}()
- // 消费消息
- wg.Add(1)
- go func() {
- defer wg.Done()
- for msg := range plusConsumer.Messages() {
- subLogger.Infof("%s/%d/%d\t%s\t%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value)
- plusConsumer.MarkOffset(msg, "") // mark message as processed
- }
- }()
- //wg.Add(1)
- //go func() {
- // defer wg.Done()
- // for msg := range reduceConsumer.Messages() {
- // subLogger.Infof( "%s/%d/%d\t%s\t%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value)
- // reduceConsumer.MarkOffset(msg, "") // mark message as processed
- // }
- //}()
- //for {
- // select {
- // case msg, ok := <-consumer.Messages():
- // if ok {
- // fmt.Fprintf(os.Stdout, "%s/%d/%d\t%s\t%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value)
- // consumer.MarkOffset(msg, "") // mark message as processed
- // }
- // case <-signals:
- // return
- // }
- //}
- //consumer, err := sarama.NewConsumer([]string {"127.0.0.1:9092"}, nil)
- //if err != nil {
- // subLogger.Println("Failed to start consumer: %s", err)
- //}
- //defer consumer.Close()
- //
- //partitionList, err := consumer.Partitions("shuaige")
- //if err != nil {
- // subLogger.Println("Failed to get the list of partitions: ", err)
- //}
- //
- //for partition := range partitionList {
- // pc, err := consumer.ConsumePartition("shuaige", int32(partition), sarama.OffsetNewest)
- // if err != nil {
- // subLogger.Printf("Failed to start consumer for partition %d: %s\n", partition, err)
- // }
- // defer pc.AsyncClose()
- //
- // wg.Add(1)
- //
- // go func(sarama.PartitionConsumer) {
- // defer wg.Done()
- // for msg := range pc.Messages() {
- // fmt.Printf("Partition:%d, Offset:%d, Key:%s, Value:%s", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
- // fmt.Println()
- // }
- // }(pc)
- //}
- wg.Wait()
- subLogger.Println("Done consuming topic")
- }
|