package main import ( "crypto/tls" "crypto/x509" "fmt" "github.com/Shopify/sarama" "io/ioutil" "strconv" "time" ) var producer sarama.SyncProducer func init() { brokers := []string{"47.94.239.212:9093", "47.94.91.146:9093", "47.94.154.247:9093"} certPath := "./alicloud_kafka/ca-cert" fmt.Print("init kafka producer, it may take a few seconds to init the connection\n") var err error mqConfig := sarama.NewConfig() mqConfig.Net.SASL.Enable = true mqConfig.Net.SASL.User = "wangyangming" mqConfig.Net.SASL.Password = "WangMy1920" mqConfig.Net.SASL.Handshake = true mqConfig.Version = sarama.V0_10_2_1 certBytes, err := ioutil.ReadFile(certPath) clientCertPool := x509.NewCertPool() ok := clientCertPool.AppendCertsFromPEM(certBytes) if !ok { panic("kafka producer failed to parse root certificate") } mqConfig.Net.TLS.Config = &tls.Config{ //Certificates: []tls.Certificate{}, RootCAs: clientCertPool, InsecureSkipVerify: true, } mqConfig.Net.TLS.Enable = true mqConfig.Producer.Return.Successes = true if err = mqConfig.Validate(); err != nil { msg := fmt.Sprintf("Kafka producer config invalidate. config: %v. err: %v", *mqConfig, err) fmt.Println(msg) panic(msg) } producer, err = sarama.NewSyncProducer(brokers, mqConfig) if err != nil { msg := fmt.Sprintf("Kafak producer create fail. err: %v", err) fmt.Println(msg) panic(msg) } } func produce(topic string, key string, content string) error { msg := &sarama.ProducerMessage{ Topic: topic, Key: sarama.StringEncoder(key), Value: sarama.StringEncoder(content), Timestamp: time.Now(), } _, _, err := producer.SendMessage(msg) if err != nil { msg := fmt.Sprintf("Send Error topic: %v. key: %v. content: %v", topic, key, content) fmt.Println(msg) return err } fmt.Printf("Send OK topic:%s key:%s value:%s\n", topic, key, content) return nil } func main() { //the key of the kafka messages //do not set the same the key for all messages, it may cause partition im-balance key := strconv.FormatInt(time.Now().UTC().UnixNano(), 10) value := "this is a kafka message!" _ = produce("chatRoom", key, value) }