package queue import ( "fmt" "sync" "testing" ) type testMsg struct { ID int } // golang chan pool unit test func TestChanPool(t *testing.T) { // parameters maxWorkers := 4 // max worker chan number msgCount := 1000 // message count // init a concurrent counter map wg := sync.WaitGroup{} wg.Add(msgCount) counter := NewCountMap() msgQueue := make(chan interface{}, 0) // input chan // message customer start NewChanDispatcher(msgQueue, maxWorkers).Run( func(workerId int, msg interface{}) { if n, ok := msg.(*testMsg); ok { fmt.Printf("worker %d received msg %d\n", workerId, n.ID) counter.Set(workerId, counter.Get(workerId)+1) wg.Done() } }, ) // send some messages for i := 0; i < msgCount; i++ { msgQueue <- &testMsg{ID: i} } // result print wg.Wait() println("") for k, v := range counter.Data { fmt.Printf("worker %d received msg total count is %d\n", k, v) } } // concurrent counter map type countMap struct { Data map[int]int Lock sync.Mutex } // NewCountMap() get new concurrent counter map instance func NewCountMap() *countMap { return &countMap{ Data: make(map[int]int), } } // Get() get counter value from map func (d *countMap) Get(k int) int { d.Lock.Lock() defer d.Lock.Unlock() return d.Data[k] } // Set() set value to counter map func (d *countMap) Set(k, v int) { d.Lock.Lock() defer d.Lock.Unlock() d.Data[k] = v }