kpub.go 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081
  1. package main
  2. import (
  3. "crypto/tls"
  4. "crypto/x509"
  5. "git.wanpinghui.com/WPH/go_common/wph/logger"
  6. "github.com/Shopify/sarama"
  7. "io/ioutil"
  8. "os"
  9. )
  10. var (
  11. pubLogger = logger.New()
  12. )
  13. func main() {
  14. //brokers := []string{"127.0.0.1:9092"}
  15. brokers := []string{"47.94.239.212:9093", "47.94.91.146:9093", "47.94.154.247:9093"}
  16. sarama.Logger = pubLogger
  17. config := sarama.NewConfig()
  18. config.Producer.RequiredAcks = sarama.WaitForAll
  19. config.Producer.Partitioner = sarama.NewRandomPartitioner
  20. config.Producer.Return.Successes = true
  21. config.Net.SASL.Enable = true
  22. config.Net.SASL.User = "wangyangming"
  23. config.Net.SASL.Password = "WangMy1920"
  24. config.Net.SASL.Handshake = true
  25. //config.Version=sarama.V0_10_2_1
  26. config.Net.TLS.Enable = true
  27. certPath := "./alicloud_kafka/ca-cert"
  28. certBytes, err := ioutil.ReadFile(certPath)
  29. clientCertPool := x509.NewCertPool()
  30. ok := clientCertPool.AppendCertsFromPEM(certBytes)
  31. if !ok {
  32. panic("kafka producer failed to parse root certificate")
  33. }
  34. config.Net.TLS.Config = &tls.Config{
  35. //Certificates: []tls.Certificate{},
  36. RootCAs: clientCertPool,
  37. InsecureSkipVerify: true,
  38. }
  39. plusMsg := &sarama.ProducerMessage{}
  40. plusMsg.Topic = "chatRoom"
  41. plusMsg.Partition = int32(-1)
  42. plusMsg.Key = sarama.StringEncoder("key")
  43. plusMsg.Value = sarama.ByteEncoder("你好, 世界++!")
  44. //reduceMsg := &sarama.ProducerMessage{}
  45. //reduceMsg.Topic = "ka-alloc-reduce-job"
  46. //reduceMsg.Partition = int32(-1)
  47. //reduceMsg.Key = sarama.StringEncoder("key")
  48. //reduceMsg.Value = sarama.ByteEncoder("你好, 世界--!")
  49. producer, err := sarama.NewSyncProducer(brokers, config)
  50. if err != nil {
  51. pubLogger.Errorf("Failed to produce message: %s", err.Error())
  52. os.Exit(500)
  53. }
  54. defer func() {
  55. _ = producer.Close()
  56. }()
  57. for i := 0; i < 10; i++ {
  58. partition, offset, err := producer.SendMessage(plusMsg)
  59. if err != nil {
  60. pubLogger.Error("Failed to produce message: ", err)
  61. }
  62. pubLogger.Infof("partition=%d, offset=%d\n", partition, offset)
  63. //partition, offset, err = producer.SendMessage(reduceMsg)
  64. //if err != nil {
  65. // pubLogger.Error("Failed to produce message: ", err)
  66. //}
  67. //pubLogger.Infof("partition=%d, offset=%d\n", partition, offset)
  68. }
  69. }