1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374 |
- package queue
- import (
- "fmt"
- "sync"
- "testing"
- )
- type testMsg struct {
- ID int
- }
- // golang chan pool unit test
- func TestChanPool(t *testing.T) {
- // parameters
- maxWorkers := 4 // max worker chan number
- msgCount := 1000 // message count
- // init a concurrent counter map
- wg := sync.WaitGroup{}
- wg.Add(msgCount)
- counter := NewCountMap()
- msgQueue := make(chan interface{}, 0) // input chan
- // message customer start
- NewChanDispatcher(msgQueue, maxWorkers).Run(
- func(workerId int, msg interface{}) {
- if n, ok := msg.(*testMsg); ok {
- fmt.Printf("worker %d received msg %d\n", workerId, n.ID)
- counter.Set(workerId, counter.Get(workerId)+1)
- wg.Done()
- }
- },
- )
- // send some messages
- for i := 0; i < msgCount; i++ {
- msgQueue <- &testMsg{ID: i}
- }
- // result print
- wg.Wait()
- println("")
- for k, v := range counter.Data {
- fmt.Printf("worker %d received msg total count is %d\n", k, v)
- }
- }
- // concurrent counter map
- type countMap struct {
- Data map[int]int
- Lock sync.Mutex
- }
- // NewCountMap() get new concurrent counter map instance
- func NewCountMap() *countMap {
- return &countMap{
- Data: make(map[int]int),
- }
- }
- // Get() get counter value from map
- func (d *countMap) Get(k int) int {
- d.Lock.Lock()
- defer d.Lock.Unlock()
- return d.Data[k]
- }
- // Set() set value to counter map
- func (d *countMap) Set(k, v int) {
- d.Lock.Lock()
- defer d.Lock.Unlock()
- d.Data[k] = v
- }
|