|
@@ -141,6 +141,7 @@ type BufferPostman struct {
|
|
|
timer *time.Timer
|
|
|
isTimerStop bool
|
|
|
target chan interface{}
|
|
|
+ lock *sync.Mutex
|
|
|
}
|
|
|
|
|
|
// 新建缓冲投递员对象
|
|
@@ -150,6 +151,7 @@ func NewBufferPostman(limit int, duration time.Duration, target chan interface{}
|
|
|
duration: duration,
|
|
|
Buffer: NewBufferMap(),
|
|
|
target: target,
|
|
|
+ lock: new(sync.Mutex),
|
|
|
}
|
|
|
if duration > 0 {
|
|
|
p.timer = time.NewTimer(duration)
|
|
@@ -170,11 +172,15 @@ func NewBufferPostman(limit int, duration time.Duration, target chan interface{}
|
|
|
// 置入消息
|
|
|
func (p *BufferPostman) Push(item IBufferItem) {
|
|
|
size := p.Buffer.Push(item)
|
|
|
- if p.isTimerStop { // 唤醒定时器
|
|
|
- p.resetTimer()
|
|
|
- }
|
|
|
- if p.limit > 0 && size >= p.limit { // 超限
|
|
|
- p.deliver()
|
|
|
+ if p.isTimerStop || (p.limit > 0 && size >= p.limit) {
|
|
|
+ p.lock.Lock()
|
|
|
+ defer p.lock.Unlock()
|
|
|
+ if p.isTimerStop { // 唤醒定时器
|
|
|
+ p.resetTimer()
|
|
|
+ }
|
|
|
+ if p.limit > 0 && size >= p.limit { // 超限
|
|
|
+ p.deliver()
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|