123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081 |
- package main
- import (
- "crypto/tls"
- "crypto/x509"
- "git.wanpinghui.com/WPH/go_common/wph/logger"
- "github.com/Shopify/sarama"
- "io/ioutil"
- "os"
- )
- var (
- pubLogger = logger.New()
- )
- func main() {
- //brokers := []string{"127.0.0.1:9092"}
- brokers := []string{"47.94.239.212:9093", "47.94.91.146:9093", "47.94.154.247:9093"}
- sarama.Logger = pubLogger
- config := sarama.NewConfig()
- config.Producer.RequiredAcks = sarama.WaitForAll
- config.Producer.Partitioner = sarama.NewRandomPartitioner
- config.Producer.Return.Successes = true
- config.Net.SASL.Enable = true
- config.Net.SASL.User = "wangyangming"
- config.Net.SASL.Password = "WangMy1920"
- config.Net.SASL.Handshake = true
- //config.Version=sarama.V0_10_2_1
- config.Net.TLS.Enable = true
- certPath := "./alicloud_kafka/ca-cert"
- certBytes, err := ioutil.ReadFile(certPath)
- clientCertPool := x509.NewCertPool()
- ok := clientCertPool.AppendCertsFromPEM(certBytes)
- if !ok {
- panic("kafka producer failed to parse root certificate")
- }
- config.Net.TLS.Config = &tls.Config{
- //Certificates: []tls.Certificate{},
- RootCAs: clientCertPool,
- InsecureSkipVerify: true,
- }
- plusMsg := &sarama.ProducerMessage{}
- plusMsg.Topic = "chatRoom"
- plusMsg.Partition = int32(-1)
- plusMsg.Key = sarama.StringEncoder("key")
- plusMsg.Value = sarama.ByteEncoder("你好, 世界++!")
- //reduceMsg := &sarama.ProducerMessage{}
- //reduceMsg.Topic = "ka-alloc-reduce-job"
- //reduceMsg.Partition = int32(-1)
- //reduceMsg.Key = sarama.StringEncoder("key")
- //reduceMsg.Value = sarama.ByteEncoder("你好, 世界--!")
- producer, err := sarama.NewSyncProducer(brokers, config)
- if err != nil {
- pubLogger.Errorf("Failed to produce message: %s", err.Error())
- os.Exit(500)
- }
- defer func() {
- _ = producer.Close()
- }()
- for i := 0; i < 10; i++ {
- partition, offset, err := producer.SendMessage(plusMsg)
- if err != nil {
- pubLogger.Error("Failed to produce message: ", err)
- }
- pubLogger.Infof("partition=%d, offset=%d\n", partition, offset)
- //partition, offset, err = producer.SendMessage(reduceMsg)
- //if err != nil {
- // pubLogger.Error("Failed to produce message: ", err)
- //}
- //pubLogger.Infof("partition=%d, offset=%d\n", partition, offset)
- }
- }
|