package main import ( "crypto/tls" "crypto/x509" "git.wanpinghui.com/WPH/go_common/wph/logger" "github.com/Shopify/sarama" "io/ioutil" "os" ) var ( pubLogger = logger.New() ) func main() { //brokers := []string{"127.0.0.1:9092"} brokers := []string{"47.94.239.212:9093", "47.94.91.146:9093", "47.94.154.247:9093"} sarama.Logger = pubLogger config := sarama.NewConfig() config.Producer.RequiredAcks = sarama.WaitForAll config.Producer.Partitioner = sarama.NewRandomPartitioner config.Producer.Return.Successes = true config.Net.SASL.Enable = true config.Net.SASL.User = "wangyangming" config.Net.SASL.Password = "WangMy1920" config.Net.SASL.Handshake = true //config.Version=sarama.V0_10_2_1 config.Net.TLS.Enable = true certPath := "./alicloud_kafka/ca-cert" certBytes, err := ioutil.ReadFile(certPath) clientCertPool := x509.NewCertPool() ok := clientCertPool.AppendCertsFromPEM(certBytes) if !ok { panic("kafka producer failed to parse root certificate") } config.Net.TLS.Config = &tls.Config{ //Certificates: []tls.Certificate{}, RootCAs: clientCertPool, InsecureSkipVerify: true, } plusMsg := &sarama.ProducerMessage{} plusMsg.Topic = "chatRoom" plusMsg.Partition = int32(-1) plusMsg.Key = sarama.StringEncoder("key") plusMsg.Value = sarama.ByteEncoder("你好, 世界++!") //reduceMsg := &sarama.ProducerMessage{} //reduceMsg.Topic = "ka-alloc-reduce-job" //reduceMsg.Partition = int32(-1) //reduceMsg.Key = sarama.StringEncoder("key") //reduceMsg.Value = sarama.ByteEncoder("你好, 世界--!") producer, err := sarama.NewSyncProducer(brokers, config) if err != nil { pubLogger.Errorf("Failed to produce message: %s", err.Error()) os.Exit(500) } defer func() { _ = producer.Close() }() for i := 0; i < 10; i++ { partition, offset, err := producer.SendMessage(plusMsg) if err != nil { pubLogger.Error("Failed to produce message: ", err) } pubLogger.Infof("partition=%d, offset=%d\n", partition, offset) //partition, offset, err = producer.SendMessage(reduceMsg) //if err != nil { // pubLogger.Error("Failed to produce message: ", err) //} //pubLogger.Infof("partition=%d, offset=%d\n", partition, offset) } }