queue_clear_test.go 1.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960
  1. package queue
  2. import (
  3. "fmt"
  4. "sync"
  5. "testing"
  6. )
  7. // buffered golang chan members release unit test
  8. func TestQueueClear(t *testing.T) {
  9. counter := &counter{} // init a concurrent counter
  10. clear := &Clear{} // init clear
  11. msgQueue := make(chan interface{}, 1000) // buffered chan
  12. // message customer start
  13. NewChanDispatcher(msgQueue, 4).Run(
  14. func(workerId int, msg interface{}) {
  15. if clear.Working() {
  16. println("ignored", counter.Add()-1)
  17. } else {
  18. if n, ok := msg.(*testMsg); ok {
  19. fmt.Printf("worker %d received msg %d\n", workerId, n.ID)
  20. }
  21. }
  22. },
  23. )
  24. // send some messages
  25. for i := 0; i < 50; i++ {
  26. msgQueue <- &testMsg{ID: i}
  27. }
  28. // but "clean" immediately
  29. clear.Clean(msgQueue) // sync block loop
  30. // wait clean over then send another messages
  31. for i := 0; i < 20; i++ {
  32. msgQueue <- &testMsg{ID: i}
  33. }
  34. quit := make(chan bool)
  35. <-quit
  36. }
  37. // concurrent counter
  38. type counter struct {
  39. Number int
  40. Lock sync.Mutex
  41. }
  42. // Add() add counter and return new value
  43. func (d *counter) Add() int {
  44. d.Lock.Lock()
  45. defer d.Lock.Unlock()
  46. d.Number++
  47. return d.Number
  48. }