chan_pool_test.go 1.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374
  1. package queue
  2. import (
  3. "fmt"
  4. "sync"
  5. "testing"
  6. )
  7. type testMsg struct {
  8. ID int
  9. }
  10. // golang chan pool unit test
  11. func TestChanPool(t *testing.T) {
  12. // parameters
  13. maxWorkers := 4 // max worker chan number
  14. msgCount := 1000 // message count
  15. // init a concurrent counter map
  16. wg := sync.WaitGroup{}
  17. wg.Add(msgCount)
  18. counter := NewCountMap()
  19. msgQueue := make(chan interface{}, 0) // input chan
  20. // message customer start
  21. NewChanDispatcher(msgQueue, maxWorkers).Run(
  22. func(workerId int, msg interface{}) {
  23. if n, ok := msg.(*testMsg); ok {
  24. fmt.Printf("worker %d received msg %d\n", workerId, n.ID)
  25. counter.Set(workerId, counter.Get(workerId)+1)
  26. wg.Done()
  27. }
  28. },
  29. )
  30. // send some messages
  31. for i := 0; i < msgCount; i++ {
  32. msgQueue <- &testMsg{ID: i}
  33. }
  34. // result print
  35. wg.Wait()
  36. println("")
  37. for k, v := range counter.Data {
  38. fmt.Printf("worker %d received msg total count is %d\n", k, v)
  39. }
  40. }
  41. // concurrent counter map
  42. type countMap struct {
  43. Data map[int]int
  44. Lock sync.Mutex
  45. }
  46. // NewCountMap() get new concurrent counter map instance
  47. func NewCountMap() *countMap {
  48. return &countMap{
  49. Data: make(map[int]int),
  50. }
  51. }
  52. // Get() get counter value from map
  53. func (d *countMap) Get(k int) int {
  54. d.Lock.Lock()
  55. defer d.Lock.Unlock()
  56. return d.Data[k]
  57. }
  58. // Set() set value to counter map
  59. func (d *countMap) Set(k, v int) {
  60. d.Lock.Lock()
  61. defer d.Lock.Unlock()
  62. d.Data[k] = v
  63. }