producer_test.go 2.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970
  1. package kafka
  2. import (
  3. "git.aionnect.com/aionnect/go-common/utils/date"
  4. "git.aionnect.com/aionnect/go-common/utils/mq"
  5. "git.aionnect.com/aionnect/go-common/utils/mq/topic"
  6. "testing"
  7. )
  8. func TestProducer_SendJSON(t *testing.T) {
  9. // 以下示例代码是模拟中断延时处理定时器需求实现,逻辑较复杂,如果只关心kafka使用本身,仅关注加了序号注释的部位即可
  10. // 1. 新建生产者
  11. producer := NewProducer()
  12. defer func(producer *Producer) {
  13. _ = producer.Close()
  14. }(producer)
  15. plusMsg := &mq.TestMsg{Message: "你好, 世界++!", Time: date.Now()}
  16. plusTopic := topic.TOP("ka-alloc-plus-job")
  17. reduceMsg := &mq.TestMsg{Message: "你好, 世界--!", Time: date.Now()}
  18. reduceTopic := topic.TOP("ka-alloc-reduce-job")
  19. var plusMsgList []*mq.ProducerMessage
  20. var reduceMsgList []*mq.ProducerMessage
  21. for i := 0; i < 5; i++ {
  22. plusMsgList = append(plusMsgList, &mq.ProducerMessage{
  23. Topic: plusTopic,
  24. Value: plusMsg,
  25. })
  26. result, err := producer.SendJSON(plusTopic, plusMsg) // 2. 发单条
  27. if err != nil {
  28. t.Error("Failed to produce message: ", err)
  29. }
  30. if nil != result {
  31. t.Log(result.(string))
  32. }
  33. reduceMsgList = append(reduceMsgList, &mq.ProducerMessage{
  34. Topic: reduceTopic,
  35. Value: reduceMsg,
  36. })
  37. result, err = producer.SendJSON(reduceTopic, reduceMsg) // 2. 发单条
  38. if err != nil {
  39. t.Error("Failed to produce message: ", err)
  40. }
  41. if nil != result {
  42. t.Log(result.(string))
  43. }
  44. }
  45. // 异步生消息生产者的发送结果处理
  46. //for i := 0; i < 5; i++ {
  47. // select {
  48. // case suc := <-producer.Successes():
  49. // fmt.Println("offset: ", suc.Offset, "timestamp: ", suc.Timestamp.String(), "partitions: ", suc.Partition)
  50. // case fail := <-producer.Errors():
  51. // fmt.Println("err: ", fail.Err)
  52. // }
  53. //}
  54. err := producer.SendJSONs(plusMsgList)
  55. if err != nil {
  56. t.Error("Failed to produce message: ", err) // 3. 发多条
  57. }
  58. err = producer.SendJSONs(reduceMsgList)
  59. if err != nil {
  60. t.Error("Failed to produce message: ", err) // 3. 发多条
  61. }
  62. }