123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221 |
- package queue
- import (
- "git.aionnect.com/aionnect/go-common/utils"
- "sync"
- "time"
- )
- // 缓冲消息接口
- type IBufferItem interface {
- BufferID() interface{} // 去重用的键,包括后续消费消息,如果需要也可以根据这个键做散列处理
- Reduce(oldVal IBufferItem) IBufferItem // 接口实现类中实现此方法,以实现累加之类的多态的业务逻辑,当然最简单不做其他处理直接返回新的对象值自身也行
- }
- // 缓冲键值表
- type BufferMap struct {
- data map[interface{}]IBufferItem
- lock *sync.Mutex
- }
- // 新建缓冲键值表对象
- func NewBufferMap() *BufferMap {
- return &BufferMap{
- data: make(map[interface{}]IBufferItem),
- lock: new(sync.Mutex),
- }
- }
- // 置入键值
- func (m *BufferMap) Push(item IBufferItem) int {
- if nil == item {
- return 0
- }
- m.lock.Lock()
- defer m.lock.Unlock()
- oldVal := m.data[item.BufferID()]
- m.data[item.BufferID()] = item.Reduce(oldVal)
- return len(m.data)
- }
- // 读取键值
- func (m *BufferMap) Get(id interface{}) IBufferItem {
- if nil == id {
- return nil
- }
- m.lock.Lock()
- defer m.lock.Unlock()
- return m.data[id]
- }
- // 读取并移除键值
- func (m *BufferMap) Pop(id interface{}) IBufferItem {
- if nil == id {
- return nil
- }
- m.lock.Lock()
- defer m.lock.Unlock()
- res := m.data[id]
- delete(m.data, id)
- return res
- }
- // 读取全部键
- func (m *BufferMap) Keys() []interface{} {
- m.lock.Lock()
- defer m.lock.Unlock()
- if len(m.data) == 0 {
- return nil
- }
- var res []interface{}
- for k := range m.data {
- res = append(res, k)
- }
- return res
- }
- // 读取全部值
- func (m *BufferMap) Values() []IBufferItem {
- m.lock.Lock()
- defer m.lock.Unlock()
- if len(m.data) == 0 {
- return nil
- }
- var res []IBufferItem
- for _, v := range m.data {
- res = append(res, v)
- }
- return res
- }
- // 读取并移除全部键值
- func (m *BufferMap) PopAll() []IBufferItem {
- m.lock.Lock()
- defer m.lock.Unlock()
- if len(m.data) == 0 {
- return nil
- }
- var res []IBufferItem
- for _, v := range m.data {
- res = append(res, v)
- }
- m.data = make(map[interface{}]IBufferItem)
- return res
- }
- // 获取大小
- func (m *BufferMap) Size() int {
- m.lock.Lock()
- defer m.lock.Unlock()
- return len(m.data)
- }
- // 移除键值
- func (m *BufferMap) Remove(id interface{}) {
- if nil == id {
- return
- }
- m.lock.Lock()
- defer m.lock.Unlock()
- if len(m.data) == 0 {
- return
- }
- delete(m.data, id)
- }
- // 清空键值表
- func (m *BufferMap) Clear() {
- m.lock.Lock()
- defer m.lock.Unlock()
- if len(m.data) == 0 {
- return
- }
- m.data = make(map[interface{}]IBufferItem)
- }
- // 缓冲投递员
- type BufferPostman struct {
- limit int
- duration time.Duration
- Buffer *BufferMap
- timer *time.Timer
- isTimerStop bool
- target chan interface{}
- lock *sync.Mutex
- }
- // 新建缓冲投递员对象
- func NewBufferPostman(limit int, duration time.Duration, target chan interface{}) *BufferPostman {
- p := &BufferPostman{
- limit: limit,
- duration: duration,
- Buffer: NewBufferMap(),
- target: target,
- lock: new(sync.Mutex),
- }
- if duration > 0 {
- p.timer = time.NewTimer(duration)
- go func(p *BufferPostman) {
- defer utils.DefaultGoroutineRecover(nil, `缓冲投递超时消息`)
- for {
- select {
- case <-p.timer.C: // 超时
- p.isTimerStop = true
- p.deliver()
- }
- }
- }(p)
- }
- return p
- }
- // 置入消息
- func (p *BufferPostman) Push(item IBufferItem) {
- size := p.Buffer.Push(item)
- if p.isTimerStop || (p.limit > 0 && size >= p.limit) {
- p.lock.Lock()
- defer p.lock.Unlock()
- if p.isTimerStop { // 唤醒定时器
- p.resetTimer()
- }
- if p.limit > 0 && size >= p.limit { // 超限
- p.deliver()
- }
- }
- }
- // 投递消息
- func (p *BufferPostman) deliver() {
- data := p.Buffer.PopAll()
- if nil != data && len(data) > 0 {
- // 仅供测试时用的日志记录
- //if eventType == 1 {
- // fmt.Printf("deliver %d messages when timeout\n", len(data))
- //} else if eventType == 2 {
- // fmt.Printf("deliver %d messages when full\n", len(data))
- //}
- // 将消息推进中转chan
- go func(p *BufferPostman, data []IBufferItem) {
- defer utils.DefaultGoroutineRecover(nil, `缓冲投递超限消息`)
- for i := 0; i < len(data); i++ {
- p.target <- data[i]
- }
- }(p, data)
- // 触发时有数据才会重置定时器
- p.resetTimer()
- }
- // 没数据,定时器就暂停了,等待新数据进入时再唤醒
- }
- // 重置定时器
- func (p *BufferPostman) resetTimer() {
- if nil != p.timer && p.duration > 0 {
- if len(p.timer.C) > 0 {
- <-p.timer.C
- }
- p.timer.Reset(p.duration)
- p.isTimerStop = false
- }
- }
|