|
@@ -1,6 +1,7 @@
|
|
|
package queue
|
|
|
|
|
|
import (
|
|
|
+ "fmt"
|
|
|
"git.haoqitour.com/haoqi/go-common/utils"
|
|
|
"sync"
|
|
|
"time"
|
|
@@ -159,7 +160,7 @@ func NewBufferPostman(limit int, duration time.Duration, target chan interface{}
|
|
|
select {
|
|
|
case <-p.timer.C: // 超时
|
|
|
p.isTimerStop = true
|
|
|
- p.deliver()
|
|
|
+ p.deliver(1)
|
|
|
}
|
|
|
}
|
|
|
}(p)
|
|
@@ -174,20 +175,20 @@ func (p *BufferPostman) Push(item IBufferItem) {
|
|
|
p.resetTimer()
|
|
|
}
|
|
|
if p.limit > 0 && size >= p.limit { // 超限
|
|
|
- p.deliver()
|
|
|
+ p.deliver(2)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
// 投递消息
|
|
|
-func (p *BufferPostman) deliver() {
|
|
|
+func (p *BufferPostman) deliver(eventType int8) {
|
|
|
data := p.Buffer.PopAll()
|
|
|
if nil != data && len(data) > 0 {
|
|
|
// 仅供测试时用的日志记录
|
|
|
- //if eventType == 1 {
|
|
|
- // fmt.Printf("deliver %d messages when timeout\n", len(data))
|
|
|
- //} else if eventType == 2 {
|
|
|
- // fmt.Printf("deliver %d messages when full\n", len(data))
|
|
|
- //}
|
|
|
+ if eventType == 1 {
|
|
|
+ fmt.Printf("deliver %d messages when timeout\n", len(data))
|
|
|
+ } else if eventType == 2 {
|
|
|
+ fmt.Printf("deliver %d messages when full\n", len(data))
|
|
|
+ }
|
|
|
|
|
|
// 将消息推进中转chan
|
|
|
go func(p *BufferPostman, data []IBufferItem) {
|