package queue import ( "fmt" "sync" "testing" ) // buffered golang chan members release unit test func TestQueueClear(t *testing.T) { counter := &counter{} // init a concurrent counter clear := &Clear{} // init clear msgQueue := make(chan interface{}, 1000) // buffered chan // message customer start NewChanDispatcher(msgQueue, 4).Run( func(workerId int, msg interface{}) { if clear.Working() { println("ignored", counter.Add()-1) } else { if n, ok := msg.(*testMsg); ok { fmt.Printf("worker %d received msg %d\n", workerId, n.ID) } } }, ) // send some messages for i := 0; i < 50; i++ { msgQueue <- &testMsg{ID: i} } // but "clean" immediately clear.Clean(msgQueue) // sync block loop // wait clean over then send another messages for i := 0; i < 20; i++ { msgQueue <- &testMsg{ID: i} } quit := make(chan bool) <-quit } // concurrent counter type counter struct { Number int Lock sync.Mutex } // Add() add counter and return new value func (d *counter) Add() int { d.Lock.Lock() defer d.Lock.Unlock() d.Number++ return d.Number }