producer.go 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384
  1. package main
  2. import (
  3. "crypto/tls"
  4. "crypto/x509"
  5. "fmt"
  6. "github.com/Shopify/sarama"
  7. "io/ioutil"
  8. "strconv"
  9. "time"
  10. )
  11. var producer sarama.SyncProducer
  12. func init() {
  13. brokers := []string{"47.94.239.212:9093", "47.94.91.146:9093", "47.94.154.247:9093"}
  14. certPath := "./alicloud_kafka/ca-cert"
  15. fmt.Print("init kafka producer, it may take a few seconds to init the connection\n")
  16. var err error
  17. mqConfig := sarama.NewConfig()
  18. mqConfig.Net.SASL.Enable = true
  19. mqConfig.Net.SASL.User = "wangyangming"
  20. mqConfig.Net.SASL.Password = "WangMy1920"
  21. mqConfig.Net.SASL.Handshake = true
  22. mqConfig.Version = sarama.V0_10_2_1
  23. certBytes, err := ioutil.ReadFile(certPath)
  24. clientCertPool := x509.NewCertPool()
  25. ok := clientCertPool.AppendCertsFromPEM(certBytes)
  26. if !ok {
  27. panic("kafka producer failed to parse root certificate")
  28. }
  29. mqConfig.Net.TLS.Config = &tls.Config{
  30. //Certificates: []tls.Certificate{},
  31. RootCAs: clientCertPool,
  32. InsecureSkipVerify: true,
  33. }
  34. mqConfig.Net.TLS.Enable = true
  35. mqConfig.Producer.Return.Successes = true
  36. if err = mqConfig.Validate(); err != nil {
  37. msg := fmt.Sprintf("Kafka producer config invalidate. config: %v. err: %v", *mqConfig, err)
  38. fmt.Println(msg)
  39. panic(msg)
  40. }
  41. producer, err = sarama.NewSyncProducer(brokers, mqConfig)
  42. if err != nil {
  43. msg := fmt.Sprintf("Kafak producer create fail. err: %v", err)
  44. fmt.Println(msg)
  45. panic(msg)
  46. }
  47. }
  48. func produce(topic string, key string, content string) error {
  49. msg := &sarama.ProducerMessage{
  50. Topic: topic,
  51. Key: sarama.StringEncoder(key),
  52. Value: sarama.StringEncoder(content),
  53. Timestamp: time.Now(),
  54. }
  55. _, _, err := producer.SendMessage(msg)
  56. if err != nil {
  57. msg := fmt.Sprintf("Send Error topic: %v. key: %v. content: %v", topic, key, content)
  58. fmt.Println(msg)
  59. return err
  60. }
  61. fmt.Printf("Send OK topic:%s key:%s value:%s\n", topic, key, content)
  62. return nil
  63. }
  64. func main() {
  65. //the key of the kafka messages
  66. //do not set the same the key for all messages, it may cause partition im-balance
  67. key := strconv.FormatInt(time.Now().UTC().UnixNano(), 10)
  68. value := "this is a kafka message!"
  69. _ = produce("chatRoom", key, value)
  70. }