producer.go 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211
  1. package kafka
  2. import (
  3. "fmt"
  4. "git.aionnect.com/aionnect/go-common/utils"
  5. "git.aionnect.com/aionnect/go-common/utils/jsonutil"
  6. "git.aionnect.com/aionnect/go-common/utils/logger"
  7. "git.aionnect.com/aionnect/go-common/utils/mq"
  8. "github.com/Shopify/sarama"
  9. "github.com/spf13/viper"
  10. )
  11. // kafka消息构造函数
  12. func NewMsg(topic string, value interface{}) (msg *sarama.ProducerMessage, err error) {
  13. var bytes []byte
  14. if nil == value {
  15. bytes = []byte{}
  16. } else {
  17. bytes, err = jsonutil.Marshal(value)
  18. if nil != err {
  19. return
  20. }
  21. }
  22. msg = &sarama.ProducerMessage{
  23. Topic: topic,
  24. Partition: int32(-1), // 用于指定partition,仅当采用NewManualPartitioner时生效,但不同topic的partition数不一,手工指定很容易出现越界错误,一般不实用
  25. Key: sarama.StringEncoder(utils.NextId().String()), // 当采用NewHashPartitioner时,是根据Key的hash值选取partition
  26. Value: sarama.ByteEncoder(bytes),
  27. }
  28. return
  29. }
  30. // --------------------------------------------------------------------------------------------------------------------
  31. // 生产者对象
  32. type Producer struct {
  33. producer mq.IProducer
  34. log *logger.Logger
  35. }
  36. // 新建生产者
  37. func NewProducer() *Producer {
  38. config := sarama.NewConfig()
  39. config.Producer.RequiredAcks = sarama.WaitForAll
  40. config.Producer.Partitioner = sarama.NewRandomPartitioner // 随机选取partition,还可以用NewRoundRobinPartitioner轮流选取,或者如前面的注释,可hash选取或手工指定
  41. config.Producer.Return.Successes = true
  42. viper.SetDefault("kafka.brokers", []string{"127.0.0.1:9092"})
  43. brokers := viper.GetStringSlice("kafka.brokers")
  44. return &Producer{
  45. producer: NewSyncProducer(brokers, config), // 初始化同步或异步生产者
  46. }
  47. }
  48. // 将底层类库的日志输出到指定日志记录器
  49. func (p *Producer) SetLogger(log *logger.Logger) {
  50. if nil == log {
  51. return
  52. }
  53. p.log = log
  54. sarama.Logger = log
  55. }
  56. // 获取当前日志记录器
  57. func (p *Producer) GetLogger() *logger.Logger {
  58. if nil == p.log {
  59. p.log = logger.New()
  60. }
  61. return p.log
  62. }
  63. // 发送单条消息
  64. func (p *Producer) SendJSON(topic string, value interface{}) (result interface{}, err error) {
  65. return p.producer.SendJSON(topic, value)
  66. }
  67. // 批量发送消息
  68. func (p *Producer) SendJSONs(messages []*mq.ProducerMessage) (err error) {
  69. return p.producer.SendJSONs(messages)
  70. }
  71. // 关闭
  72. func (p *Producer) Close() error {
  73. return p.producer.Close()
  74. }
  75. // --------------------------------------------------------------------------------------------------------------------
  76. // 同步生产者
  77. type SyncProducer struct {
  78. sarama.SyncProducer
  79. }
  80. // 新建同步生产者
  81. func NewSyncProducer(brokers []string, config *sarama.Config) *SyncProducer {
  82. producer, err := sarama.NewSyncProducer(brokers, config)
  83. if err != nil {
  84. panic(fmt.Sprintf("Failed to produce message: %s", err))
  85. }
  86. return &SyncProducer{SyncProducer: producer}
  87. }
  88. // 同步生产者发送单条消息
  89. func (p *SyncProducer) SendJSON(topic string, value interface{}) (result interface{}, err error) {
  90. if topic == "" {
  91. return "", nil
  92. }
  93. var msg *sarama.ProducerMessage
  94. msg, err = NewMsg(topic, value)
  95. if nil != err {
  96. return "", err
  97. }
  98. var partition int32
  99. var offset int64
  100. partition, offset, err = p.SendMessage(msg)
  101. if nil != err {
  102. return "", err
  103. }
  104. return fmt.Sprintf("partition=%d, offset=%d\n", partition, offset), nil
  105. }
  106. // 同步生产者批量发送消息
  107. func (p *SyncProducer) SendJSONs(messages []*mq.ProducerMessage) (err error) {
  108. if nil == messages || len(messages) == 0 {
  109. return
  110. }
  111. var msgList []*sarama.ProducerMessage
  112. for i := 0; i < len(messages); i++ {
  113. if messages[i].Topic == "" {
  114. continue
  115. }
  116. var msg *sarama.ProducerMessage
  117. msg, err = NewMsg(messages[i].Topic, messages[i].Value)
  118. if nil != err {
  119. continue
  120. }
  121. msgList = append(msgList, msg)
  122. }
  123. return p.SendMessages(msgList)
  124. }
  125. // --------------------------------------------------------------------------------------------------------------------
  126. // 异步生产者
  127. type AsyncProducer struct {
  128. sarama.AsyncProducer
  129. }
  130. // 新建异步生产者
  131. func NewAsyncProducer(brokers []string, config *sarama.Config) *AsyncProducer {
  132. producer, err := sarama.NewAsyncProducer(brokers, config)
  133. if err != nil {
  134. panic(fmt.Sprintf("Failed to produce message: %s", err))
  135. }
  136. return &AsyncProducer{AsyncProducer: producer}
  137. }
  138. // 异步生产者发送单条消息
  139. func (p *AsyncProducer) SendJSON(topic string, value interface{}) (result interface{}, err error) {
  140. if topic == "" {
  141. return "", nil
  142. }
  143. var msg *sarama.ProducerMessage
  144. msg, err = NewMsg(topic, value)
  145. if nil != err {
  146. return "", err
  147. }
  148. p.Input() <- msg
  149. return "success", nil
  150. }
  151. // 异步生产者批量发送消息
  152. func (p *AsyncProducer) SendJSONs(messages []*mq.ProducerMessage) (err error) {
  153. if nil == messages || len(messages) == 0 {
  154. return
  155. }
  156. for i := 0; i < len(messages); i++ {
  157. if messages[i].Topic == "" {
  158. continue
  159. }
  160. var msg *sarama.ProducerMessage
  161. msg, err = NewMsg(messages[i].Topic, messages[i].Value)
  162. if nil != err {
  163. continue
  164. }
  165. p.Input() <- msg
  166. }
  167. return
  168. }
  169. // 注意异步生产者其实还有一个异步关闭的方法,且其需搭配下列结果处理代码使用
  170. // 异步生消息生产者发送结果处理
  171. //for {
  172. // select {
  173. // case suc := <-producer.Successes():
  174. // fmt.Println("offset: ", suc.Offset, "timestamp: ", suc.Timestamp.String(), "partitions: ", suc.Partition)
  175. // case fail := <-producer.Errors():
  176. // fmt.Println("err: ", fail.Err)
  177. // }
  178. //}