123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960 |
- package queue
- import (
- "fmt"
- "sync"
- "testing"
- )
- // buffered golang chan members release unit test
- func TestQueueClear(t *testing.T) {
- counter := &counter{} // init a concurrent counter
- clear := &Clear{} // init clear
- msgQueue := make(chan interface{}, 1000) // buffered chan
- // message customer start
- NewChanDispatcher(msgQueue, 4).Run(
- func(workerId int, msg interface{}) {
- if clear.Working() {
- println("ignored", counter.Add()-1)
- } else {
- if n, ok := msg.(*testMsg); ok {
- fmt.Printf("worker %d received msg %d\n", workerId, n.ID)
- }
- }
- },
- )
- // send some messages
- for i := 0; i < 50; i++ {
- msgQueue <- &testMsg{ID: i}
- }
- // but "clean" immediately
- clear.Clean(msgQueue) // sync block loop
- // wait clean over then send another messages
- for i := 0; i < 20; i++ {
- msgQueue <- &testMsg{ID: i}
- }
- quit := make(chan bool)
- <-quit
- }
- // concurrent counter
- type counter struct {
- Number int
- Lock sync.Mutex
- }
- // Add() add counter and return new value
- func (d *counter) Add() int {
- d.Lock.Lock()
- defer d.Lock.Unlock()
- d.Number++
- return d.Number
- }
|