123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102 |
- package queue
- import (
- "fmt"
- "git.aionnect.com/aionnect/go-common/utils"
- "testing"
- "time"
- )
- var (
- testQueue chan interface{}
- )
- func init() {
- testQueue = make(chan interface{}, 500)
- }
- type TestMessage struct {
- ID utils.Long
- Count int
- }
- func (m *TestMessage) BufferID() interface{} {
- return m.ID
- }
- func (m *TestMessage) Reduce(oldVal IBufferItem) IBufferItem {
- return m // 此行示例即不管旧值,直接返回新值,Push后的效果即新值直接替换旧值
- // 或者实现累加之类的逻辑
- //m.Count = oldVal.(*TestMessage).Count + m.Count
- //return m
- }
- func TestBufferPostman_Push(t *testing.T) {
- // 初始化缓冲投递员
- bufPost := NewBufferPostman(5, 5*time.Second, testQueue)
- // 启动并发的消息消费者chan池
- NewChanDispatcher(testQueue, 4).Run(
- func(workerId int, msg interface{}) {
- if i, ok := msg.(IBufferItem); ok {
- fmt.Printf("consumer %d recevie message %d\n", workerId, i.BufferID())
- }
- },
- )
- // 准备一个测试去重用的id列表
- var ids []utils.Long
- for i := 0; i < 17; i++ {
- ids = append(ids, utils.Long(i))
- }
- // 消息并发压进缓冲
- // 整个数据流:消息加进缓冲的map -> 超时或超限时,取出全部消息推进中转chan -> 中转chan消费者处理消息
- // 上述数据流入口,也可以是消费其它chan或消息中间件的消息,然后再加进map
- // 中转chan消费者不一定要用并发chan池,例如,也可以根据id散列,启动固定的几个消息消费者 -- 这对向数据库写数据更加合适,不易因为并发造成死锁
- for g := 0; g < 3; g++ {
- go func() {
- // 测试超时和超限机制
- for i := 0; i < 50; i++ {
- time.Sleep(100 * time.Millisecond)
- var id utils.Long = 0
- if nil != ids && len(ids) > 0 {
- id = ids[i%len(ids)]
- }
- bufPost.Push(&TestMessage{
- ID: id,
- Count: i,
- })
- }
- // 当触发一次无数据时,超时定时器应该会停止
- }()
- }
- // 测试超时定时器唤醒机制
- go func() {
- time.Sleep(30 * time.Second)
- println(`30 seconds later`)
- bufPost.Push(&TestMessage{
- ID: ids[0],
- Count: 0,
- })
- time.Sleep(10 * time.Second)
- for g := 0; g < 3; g++ {
- go func() {
- for i := 0; i < 500; i++ {
- time.Sleep(100 * time.Millisecond)
- var id utils.Long = 0
- if nil != ids && len(ids) > 0 {
- id = ids[i%len(ids)]
- }
- bufPost.Push(&TestMessage{
- ID: id,
- Count: i,
- })
- }
- }()
- }
- }()
- quit := make(chan bool)
- <-quit
- }
|