package main import ( "crypto/tls" "crypto/x509" "git.wanpinghui.com/WPH/go_common/wph/logger" "github.com/Shopify/sarama" "github.com/bsm/sarama-cluster" "io/ioutil" "sync" ) var ( wg sync.WaitGroup subLogger = logger.New() ) func main() { //brokers := []string{"127.0.0.1:9092"} brokers := []string{"47.94.239.212:9093", "47.94.91.146:9093", "47.94.154.247:9093"} sarama.Logger = subLogger config := cluster.NewConfig() config.Consumer.Offsets.Initial = sarama.OffsetOldest config.Consumer.Return.Errors = true config.Group.Return.Notifications = false config.Net.SASL.Enable = true config.Net.SASL.User = "wangyangming" config.Net.SASL.Password = "WangMy1920" config.Net.SASL.Handshake = true config.Version = sarama.V0_10_2_1 config.Net.TLS.Enable = true certPath := "./alicloud_kafka/ca-cert" certBytes, err := ioutil.ReadFile(certPath) clientCertPool := x509.NewCertPool() ok := clientCertPool.AppendCertsFromPEM(certBytes) if !ok { panic("kafka consumer failed to parse root certificate") } config.Net.TLS.Config = &tls.Config{ //Certificates: []tls.Certificate{}, RootCAs: clientCertPool, InsecureSkipVerify: true, } // 初始化增加定时任务消息消费者,相同group名单播消费消息 plusTopics := []string{"chatRoom"} plusConsumer, err := cluster.NewConsumer(brokers, "chat-room-group", plusTopics, config) if err != nil { panic(err) } defer func() { _ = plusConsumer.Close() }() go func() { for err := range plusConsumer.Errors() { subLogger.Error("Error: %s\n", err.Error()) } }() //// 初始化减少定时任务消息消费者,不同group名广播消费消息 //reduceTopics := []string{"ka-alloc-reduce-job"} //// wph.GetPrivateIPv4Id() 这个函数会根据当前机器内网ip的末尾两段运算出一个id,即测试和生产环境不同Pod的ip不同这个id也会不同 //// 正式编码请用 wph.GetPrivateIPv4Id() 代替 wph.NextId() //machineId := wph.NextId() //reduceConsumer, err := cluster.NewConsumer(brokers, fmt.Sprintf("ka-alloc-reduce-job-group-%d", machineId), reduceTopics, config) //if err != nil { // panic(err) //} //defer func() { // _ = reduceConsumer.Close() //}() //go func() { // for err := range reduceConsumer.Errors() { // subLogger.Error("Error: %s\n", err.Error()) // } //}() // 消费消息 wg.Add(1) go func() { defer wg.Done() for msg := range plusConsumer.Messages() { subLogger.Infof("%s/%d/%d\t%s\t%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value) plusConsumer.MarkOffset(msg, "") // mark message as processed } }() //wg.Add(1) //go func() { // defer wg.Done() // for msg := range reduceConsumer.Messages() { // subLogger.Infof( "%s/%d/%d\t%s\t%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value) // reduceConsumer.MarkOffset(msg, "") // mark message as processed // } //}() //for { // select { // case msg, ok := <-consumer.Messages(): // if ok { // fmt.Fprintf(os.Stdout, "%s/%d/%d\t%s\t%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value) // consumer.MarkOffset(msg, "") // mark message as processed // } // case <-signals: // return // } //} //consumer, err := sarama.NewConsumer([]string {"127.0.0.1:9092"}, nil) //if err != nil { // subLogger.Println("Failed to start consumer: %s", err) //} //defer consumer.Close() // //partitionList, err := consumer.Partitions("shuaige") //if err != nil { // subLogger.Println("Failed to get the list of partitions: ", err) //} // //for partition := range partitionList { // pc, err := consumer.ConsumePartition("shuaige", int32(partition), sarama.OffsetNewest) // if err != nil { // subLogger.Printf("Failed to start consumer for partition %d: %s\n", partition, err) // } // defer pc.AsyncClose() // // wg.Add(1) // // go func(sarama.PartitionConsumer) { // defer wg.Done() // for msg := range pc.Messages() { // fmt.Printf("Partition:%d, Offset:%d, Key:%s, Value:%s", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value)) // fmt.Println() // } // }(pc) //} wg.Wait() subLogger.Println("Done consuming topic") }