chan_pool_test.go 1.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970
  1. package queue
  2. import (
  3. "fmt"
  4. "sync"
  5. "testing"
  6. )
  7. type testMsg struct {
  8. ID int
  9. }
  10. // "线程池"(golang管道池)单元测试
  11. func TestChanPool(t *testing.T) {
  12. // 参数
  13. maxWorkers := 4 // 最大工作管道数
  14. msgCount := 1000 // 消息数量
  15. // 非主逻辑,计数器初始化
  16. wg := sync.WaitGroup{} // 同步计数器
  17. wg.Add(msgCount)
  18. counter := NewCountMap() // 并发安全统计计数器
  19. // 启动工作管道池调度器
  20. msgQueue := make(chan interface{}, 0) // 输入队列
  21. NewChanDispatcher(msgQueue, maxWorkers).Run(
  22. func(workerId int, msg interface{}) {
  23. if n, ok := msg.(*testMsg); ok {
  24. fmt.Printf("worker %d received msg %d\n", workerId, n.ID)
  25. counter.Set(workerId, counter.Get(workerId)+1)
  26. wg.Done()
  27. }
  28. },
  29. )
  30. // 发消息给输入队列
  31. for i := 0; i < msgCount; i++ {
  32. msgQueue <- &testMsg{ID: i}
  33. }
  34. // 非主逻辑,计数器打印
  35. wg.Wait()
  36. println("")
  37. for k, v := range counter.Data {
  38. fmt.Printf("worker %d received msg total count is %d\n", k, v)
  39. }
  40. }
  41. type countMap struct {
  42. Data map[int]int
  43. Lock sync.Mutex
  44. }
  45. func NewCountMap() *countMap {
  46. return &countMap{
  47. Data: make(map[int]int),
  48. }
  49. }
  50. func (d *countMap) Get(k int) int {
  51. d.Lock.Lock()
  52. defer d.Lock.Unlock()
  53. return d.Data[k]
  54. }
  55. func (d *countMap) Set(k, v int) {
  56. d.Lock.Lock()
  57. defer d.Lock.Unlock()
  58. d.Data[k] = v
  59. }