123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657 |
- package nats
- import (
- "fmt"
- "git.aionnect.com/aionnect/go-common/utils/mq"
- "git.aionnect.com/aionnect/go-common/utils/mq/topic"
- "sync"
- "testing"
- )
- var wg sync.WaitGroup
- func TestNewConsumer(t *testing.T) {
- // queue为空,广播
- consumer1 := NewConsumer("", topic.TOP("test-subject"))
- defer func(consumer1 *Consumer) {
- _ = consumer1.Close()
- }(consumer1)
- // queue非空,单播
- consumer2 := NewConsumer("testQueue", topic.TOP("test-subject"))
- defer func(consumer2 *Consumer) {
- _ = consumer2.Close()
- }(consumer2)
- wg.Add(1)
- go func() {
- defer wg.Done()
- i1 := 0
- var ch1 = make(chan *mq.TestMsg, 0)
- consumer1.BindJSONChan(ch1)
- for {
- select {
- case msg1 := <-ch1:
- i1++
- fmt.Printf("1-%d: Received a broadcast message: %s\n", i1, msg1)
- }
- }
- }()
- wg.Add(1)
- go func() {
- defer wg.Done()
- i2 := 0
- var ch2 = make(chan *mq.TestMsg, 0)
- consumer2.BindJSONChan(ch2)
- for {
- select {
- case msg2 := <-ch2:
- i2++
- fmt.Printf("2-%d: Received a message: %s\n", i2, msg2)
- }
- }
- }()
- wg.Wait()
- t.Log("Done consuming topic")
- }
|