consumer_test.go 1.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657
  1. package nats
  2. import (
  3. "fmt"
  4. "git.aionnect.com/aionnect/go-common/utils/mq"
  5. "git.aionnect.com/aionnect/go-common/utils/mq/topic"
  6. "sync"
  7. "testing"
  8. )
  9. var wg sync.WaitGroup
  10. func TestNewConsumer(t *testing.T) {
  11. // queue为空,广播
  12. consumer1 := NewConsumer("", topic.TOP("test-subject"))
  13. defer func(consumer1 *Consumer) {
  14. _ = consumer1.Close()
  15. }(consumer1)
  16. // queue非空,单播
  17. consumer2 := NewConsumer("testQueue", topic.TOP("test-subject"))
  18. defer func(consumer2 *Consumer) {
  19. _ = consumer2.Close()
  20. }(consumer2)
  21. wg.Add(1)
  22. go func() {
  23. defer wg.Done()
  24. i1 := 0
  25. var ch1 = make(chan *mq.TestMsg, 0)
  26. consumer1.BindJSONChan(ch1)
  27. for {
  28. select {
  29. case msg1 := <-ch1:
  30. i1++
  31. fmt.Printf("1-%d: Received a broadcast message: %s\n", i1, msg1)
  32. }
  33. }
  34. }()
  35. wg.Add(1)
  36. go func() {
  37. defer wg.Done()
  38. i2 := 0
  39. var ch2 = make(chan *mq.TestMsg, 0)
  40. consumer2.BindJSONChan(ch2)
  41. for {
  42. select {
  43. case msg2 := <-ch2:
  44. i2++
  45. fmt.Printf("2-%d: Received a message: %s\n", i2, msg2)
  46. }
  47. }
  48. }()
  49. wg.Wait()
  50. t.Log("Done consuming topic")
  51. }