package nats import ( "fmt" "git.aionnect.com/aionnect/go-common/utils/mq" "git.aionnect.com/aionnect/go-common/utils/mq/topic" "sync" "testing" ) var wg sync.WaitGroup func TestNewConsumer(t *testing.T) { // queue为空,广播 consumer1 := NewConsumer("", topic.TOP("test-subject")) defer func(consumer1 *Consumer) { _ = consumer1.Close() }(consumer1) // queue非空,单播 consumer2 := NewConsumer("testQueue", topic.TOP("test-subject")) defer func(consumer2 *Consumer) { _ = consumer2.Close() }(consumer2) wg.Add(1) go func() { defer wg.Done() i1 := 0 var ch1 = make(chan *mq.TestMsg, 0) consumer1.BindJSONChan(ch1) for { select { case msg1 := <-ch1: i1++ fmt.Printf("1-%d: Received a broadcast message: %s\n", i1, msg1) } } }() wg.Add(1) go func() { defer wg.Done() i2 := 0 var ch2 = make(chan *mq.TestMsg, 0) consumer2.BindJSONChan(ch2) for { select { case msg2 := <-ch2: i2++ fmt.Printf("2-%d: Received a message: %s\n", i2, msg2) } } }() wg.Wait() t.Log("Done consuming topic") }