package queue import ( "fmt" "sync" "testing" ) type testMsg struct { ID int } // "线程池"(golang管道池)单元测试 func TestChanPool(t *testing.T) { // 参数 maxWorkers := 4 // 最大工作管道数 msgCount := 1000 // 消息数量 // 非主逻辑,计数器初始化 wg := sync.WaitGroup{} // 同步计数器 wg.Add(msgCount) counter := NewCountMap() // 并发安全统计计数器 // 启动工作管道池调度器 msgQueue := make(chan interface{}, 0) // 输入队列 NewChanDispatcher(msgQueue, maxWorkers).Run( func(workerId int, msg interface{}) { if n, ok := msg.(*testMsg); ok { fmt.Printf("worker %d received msg %d\n", workerId, n.ID) counter.Set(workerId, counter.Get(workerId)+1) wg.Done() } }, ) // 发消息给输入队列 for i := 0; i < msgCount; i++ { msgQueue <- &testMsg{ID: i} } // 非主逻辑,计数器打印 wg.Wait() println("") for k, v := range counter.Data { fmt.Printf("worker %d received msg total count is %d\n", k, v) } } type countMap struct { Data map[int]int Lock sync.Mutex } func NewCountMap() *countMap { return &countMap{ Data: make(map[int]int), } } func (d *countMap) Get(k int) int { d.Lock.Lock() defer d.Lock.Unlock() return d.Data[k] } func (d *countMap) Set(k, v int) { d.Lock.Lock() defer d.Lock.Unlock() d.Data[k] = v }