123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384 |
- package main
- import (
- "crypto/tls"
- "crypto/x509"
- "fmt"
- "github.com/Shopify/sarama"
- "io/ioutil"
- "strconv"
- "time"
- )
- var producer sarama.SyncProducer
- func init() {
- brokers := []string{"47.94.239.212:9093", "47.94.91.146:9093", "47.94.154.247:9093"}
- certPath := "./alicloud_kafka/ca-cert"
- fmt.Print("init kafka producer, it may take a few seconds to init the connection\n")
- var err error
- mqConfig := sarama.NewConfig()
- mqConfig.Net.SASL.Enable = true
- mqConfig.Net.SASL.User = "wangyangming"
- mqConfig.Net.SASL.Password = "WangMy1920"
- mqConfig.Net.SASL.Handshake = true
- mqConfig.Version = sarama.V0_10_2_1
- certBytes, err := ioutil.ReadFile(certPath)
- clientCertPool := x509.NewCertPool()
- ok := clientCertPool.AppendCertsFromPEM(certBytes)
- if !ok {
- panic("kafka producer failed to parse root certificate")
- }
- mqConfig.Net.TLS.Config = &tls.Config{
- //Certificates: []tls.Certificate{},
- RootCAs: clientCertPool,
- InsecureSkipVerify: true,
- }
- mqConfig.Net.TLS.Enable = true
- mqConfig.Producer.Return.Successes = true
- if err = mqConfig.Validate(); err != nil {
- msg := fmt.Sprintf("Kafka producer config invalidate. config: %v. err: %v", *mqConfig, err)
- fmt.Println(msg)
- panic(msg)
- }
- producer, err = sarama.NewSyncProducer(brokers, mqConfig)
- if err != nil {
- msg := fmt.Sprintf("Kafak producer create fail. err: %v", err)
- fmt.Println(msg)
- panic(msg)
- }
- }
- func produce(topic string, key string, content string) error {
- msg := &sarama.ProducerMessage{
- Topic: topic,
- Key: sarama.StringEncoder(key),
- Value: sarama.StringEncoder(content),
- Timestamp: time.Now(),
- }
- _, _, err := producer.SendMessage(msg)
- if err != nil {
- msg := fmt.Sprintf("Send Error topic: %v. key: %v. content: %v", topic, key, content)
- fmt.Println(msg)
- return err
- }
- fmt.Printf("Send OK topic:%s key:%s value:%s\n", topic, key, content)
- return nil
- }
- func main() {
- //the key of the kafka messages
- //do not set the same the key for all messages, it may cause partition im-balance
- key := strconv.FormatInt(time.Now().UTC().UnixNano(), 10)
- value := "this is a kafka message!"
- _ = produce("chatRoom", key, value)
- }
|