buffer_postman_test.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102
  1. package queue
  2. import (
  3. "fmt"
  4. "git.aionnect.com/aionnect/go-common/utils"
  5. "testing"
  6. "time"
  7. )
  8. var (
  9. testQueue chan interface{}
  10. )
  11. func init() {
  12. testQueue = make(chan interface{}, 500)
  13. }
  14. type TestMessage struct {
  15. ID utils.Long
  16. Count int
  17. }
  18. func (m *TestMessage) BufferID() interface{} {
  19. return m.ID
  20. }
  21. func (m *TestMessage) Reduce(oldVal IBufferItem) IBufferItem {
  22. return m // 此行示例即不管旧值,直接返回新值,Push后的效果即新值直接替换旧值
  23. // 或者实现累加之类的逻辑
  24. //m.Count = oldVal.(*TestMessage).Count + m.Count
  25. //return m
  26. }
  27. func TestBufferPostman_Push(t *testing.T) {
  28. // 初始化缓冲投递员
  29. bufPost := NewBufferPostman(5, 5*time.Second, testQueue)
  30. // 启动并发的消息消费者chan池
  31. NewChanDispatcher(testQueue, 4).Run(
  32. func(workerId int, msg interface{}) {
  33. if i, ok := msg.(IBufferItem); ok {
  34. fmt.Printf("consumer %d recevie message %d\n", workerId, i.BufferID())
  35. }
  36. },
  37. )
  38. // 准备一个测试去重用的id列表
  39. var ids []utils.Long
  40. for i := 0; i < 17; i++ {
  41. ids = append(ids, utils.Long(i))
  42. }
  43. // 消息并发压进缓冲
  44. // 整个数据流:消息加进缓冲的map -> 超时或超限时,取出全部消息推进中转chan -> 中转chan消费者处理消息
  45. // 上述数据流入口,也可以是消费其它chan或消息中间件的消息,然后再加进map
  46. // 中转chan消费者不一定要用并发chan池,例如,也可以根据id散列,启动固定的几个消息消费者 -- 这对向数据库写数据更加合适,不易因为并发造成死锁
  47. for g := 0; g < 3; g++ {
  48. go func() {
  49. // 测试超时和超限机制
  50. for i := 0; i < 50; i++ {
  51. time.Sleep(100 * time.Millisecond)
  52. var id utils.Long = 0
  53. if nil != ids && len(ids) > 0 {
  54. id = ids[i%len(ids)]
  55. }
  56. bufPost.Push(&TestMessage{
  57. ID: id,
  58. Count: i,
  59. })
  60. }
  61. // 当触发一次无数据时,超时定时器应该会停止
  62. }()
  63. }
  64. // 测试超时定时器唤醒机制
  65. go func() {
  66. time.Sleep(30 * time.Second)
  67. println(`30 seconds later`)
  68. bufPost.Push(&TestMessage{
  69. ID: ids[0],
  70. Count: 0,
  71. })
  72. time.Sleep(10 * time.Second)
  73. for g := 0; g < 3; g++ {
  74. go func() {
  75. for i := 0; i < 500; i++ {
  76. time.Sleep(100 * time.Millisecond)
  77. var id utils.Long = 0
  78. if nil != ids && len(ids) > 0 {
  79. id = ids[i%len(ids)]
  80. }
  81. bufPost.Push(&TestMessage{
  82. ID: id,
  83. Count: i,
  84. })
  85. }
  86. }()
  87. }
  88. }()
  89. quit := make(chan bool)
  90. <-quit
  91. }