chan_pool_test.go 1.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071
  1. package queue
  2. import (
  3. "fmt"
  4. "sync"
  5. "testing"
  6. )
  7. type testMsg struct {
  8. ID int
  9. }
  10. // "线程池"(golang管道池)单元测试
  11. func TestChanPool(t *testing.T) {
  12. // 参数
  13. maxWorkers := 4 // 最大工作管道数
  14. msgCount := 1000 // 消息数量
  15. // 非主逻辑,计数器初始化
  16. wg := sync.WaitGroup{} // 同步计数器
  17. wg.Add(msgCount)
  18. counter := NewCountMap() // 并发安全统计计数器
  19. // 启动工作管道池调度器
  20. msgQueue := make(chan interface{}, 0) // 输入队列
  21. NewChanDispatcher(msgQueue, maxWorkers).Run(
  22. //NewChanDispatcherWithCapacity(msgQueue, maxWorkers, 5).Run(
  23. func(workerId int, msg interface{}) {
  24. if n, ok := msg.(*testMsg); ok {
  25. fmt.Printf("worker %d received msg %d\n", workerId, n.ID)
  26. counter.Set(workerId, counter.Get(workerId)+1)
  27. wg.Done()
  28. }
  29. },
  30. )
  31. // 发消息给输入队列
  32. for i := 0; i < msgCount; i++ {
  33. msgQueue <- &testMsg{ID: i}
  34. }
  35. // 非主逻辑,计数器打印
  36. wg.Wait()
  37. println("")
  38. for k, v := range counter.Data {
  39. fmt.Printf("worker %d received msg total count is %d\n", k, v)
  40. }
  41. }
  42. type countMap struct {
  43. Data map[int]int
  44. Lock sync.Mutex
  45. }
  46. func NewCountMap() *countMap {
  47. return &countMap{
  48. Data: make(map[int]int),
  49. }
  50. }
  51. func (d *countMap) Get(k int) int {
  52. d.Lock.Lock()
  53. defer d.Lock.Unlock()
  54. return d.Data[k]
  55. }
  56. func (d *countMap) Set(k, v int) {
  57. d.Lock.Lock()
  58. defer d.Lock.Unlock()
  59. d.Data[k] = v
  60. }