buffer_postman.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221
  1. package queue
  2. import (
  3. "git.aionnect.com/aionnect/go-common/utils"
  4. "sync"
  5. "time"
  6. )
  7. // 缓冲消息接口
  8. type IBufferItem interface {
  9. BufferID() interface{} // 去重用的键,包括后续消费消息,如果需要也可以根据这个键做散列处理
  10. Reduce(oldVal IBufferItem) IBufferItem // 接口实现类中实现此方法,以实现累加之类的多态的业务逻辑,当然最简单不做其他处理直接返回新的对象值自身也行
  11. }
  12. // 缓冲键值表
  13. type BufferMap struct {
  14. data map[interface{}]IBufferItem
  15. lock *sync.Mutex
  16. }
  17. // 新建缓冲键值表对象
  18. func NewBufferMap() *BufferMap {
  19. return &BufferMap{
  20. data: make(map[interface{}]IBufferItem),
  21. lock: new(sync.Mutex),
  22. }
  23. }
  24. // 置入键值
  25. func (m *BufferMap) Push(item IBufferItem) int {
  26. if nil == item {
  27. return 0
  28. }
  29. m.lock.Lock()
  30. defer m.lock.Unlock()
  31. oldVal := m.data[item.BufferID()]
  32. m.data[item.BufferID()] = item.Reduce(oldVal)
  33. return len(m.data)
  34. }
  35. // 读取键值
  36. func (m *BufferMap) Get(id interface{}) IBufferItem {
  37. if nil == id {
  38. return nil
  39. }
  40. m.lock.Lock()
  41. defer m.lock.Unlock()
  42. return m.data[id]
  43. }
  44. // 读取并移除键值
  45. func (m *BufferMap) Pop(id interface{}) IBufferItem {
  46. if nil == id {
  47. return nil
  48. }
  49. m.lock.Lock()
  50. defer m.lock.Unlock()
  51. res := m.data[id]
  52. delete(m.data, id)
  53. return res
  54. }
  55. // 读取全部键
  56. func (m *BufferMap) Keys() []interface{} {
  57. m.lock.Lock()
  58. defer m.lock.Unlock()
  59. if len(m.data) == 0 {
  60. return nil
  61. }
  62. var res []interface{}
  63. for k := range m.data {
  64. res = append(res, k)
  65. }
  66. return res
  67. }
  68. // 读取全部值
  69. func (m *BufferMap) Values() []IBufferItem {
  70. m.lock.Lock()
  71. defer m.lock.Unlock()
  72. if len(m.data) == 0 {
  73. return nil
  74. }
  75. var res []IBufferItem
  76. for _, v := range m.data {
  77. res = append(res, v)
  78. }
  79. return res
  80. }
  81. // 读取并移除全部键值
  82. func (m *BufferMap) PopAll() []IBufferItem {
  83. m.lock.Lock()
  84. defer m.lock.Unlock()
  85. if len(m.data) == 0 {
  86. return nil
  87. }
  88. var res []IBufferItem
  89. for _, v := range m.data {
  90. res = append(res, v)
  91. }
  92. m.data = make(map[interface{}]IBufferItem)
  93. return res
  94. }
  95. // 获取大小
  96. func (m *BufferMap) Size() int {
  97. m.lock.Lock()
  98. defer m.lock.Unlock()
  99. return len(m.data)
  100. }
  101. // 移除键值
  102. func (m *BufferMap) Remove(id interface{}) {
  103. if nil == id {
  104. return
  105. }
  106. m.lock.Lock()
  107. defer m.lock.Unlock()
  108. if len(m.data) == 0 {
  109. return
  110. }
  111. delete(m.data, id)
  112. }
  113. // 清空键值表
  114. func (m *BufferMap) Clear() {
  115. m.lock.Lock()
  116. defer m.lock.Unlock()
  117. if len(m.data) == 0 {
  118. return
  119. }
  120. m.data = make(map[interface{}]IBufferItem)
  121. }
  122. // 缓冲投递员
  123. type BufferPostman struct {
  124. limit int
  125. duration time.Duration
  126. Buffer *BufferMap
  127. timer *time.Timer
  128. isTimerStop bool
  129. target chan interface{}
  130. lock *sync.Mutex
  131. }
  132. // 新建缓冲投递员对象
  133. func NewBufferPostman(limit int, duration time.Duration, target chan interface{}) *BufferPostman {
  134. p := &BufferPostman{
  135. limit: limit,
  136. duration: duration,
  137. Buffer: NewBufferMap(),
  138. target: target,
  139. lock: new(sync.Mutex),
  140. }
  141. if duration > 0 {
  142. p.timer = time.NewTimer(duration)
  143. go func(p *BufferPostman) {
  144. defer utils.DefaultGoroutineRecover(nil, `缓冲投递超时消息`)
  145. for {
  146. select {
  147. case <-p.timer.C: // 超时
  148. p.isTimerStop = true
  149. p.deliver()
  150. }
  151. }
  152. }(p)
  153. }
  154. return p
  155. }
  156. // 置入消息
  157. func (p *BufferPostman) Push(item IBufferItem) {
  158. size := p.Buffer.Push(item)
  159. if p.isTimerStop || (p.limit > 0 && size >= p.limit) {
  160. p.lock.Lock()
  161. defer p.lock.Unlock()
  162. if p.isTimerStop { // 唤醒定时器
  163. p.resetTimer()
  164. }
  165. if p.limit > 0 && size >= p.limit { // 超限
  166. p.deliver()
  167. }
  168. }
  169. }
  170. // 投递消息
  171. func (p *BufferPostman) deliver() {
  172. data := p.Buffer.PopAll()
  173. if nil != data && len(data) > 0 {
  174. // 仅供测试时用的日志记录
  175. //if eventType == 1 {
  176. // fmt.Printf("deliver %d messages when timeout\n", len(data))
  177. //} else if eventType == 2 {
  178. // fmt.Printf("deliver %d messages when full\n", len(data))
  179. //}
  180. // 将消息推进中转chan
  181. go func(p *BufferPostman, data []IBufferItem) {
  182. defer utils.DefaultGoroutineRecover(nil, `缓冲投递超限消息`)
  183. for i := 0; i < len(data); i++ {
  184. p.target <- data[i]
  185. }
  186. }(p, data)
  187. // 触发时有数据才会重置定时器
  188. p.resetTimer()
  189. }
  190. // 没数据,定时器就暂停了,等待新数据进入时再唤醒
  191. }
  192. // 重置定时器
  193. func (p *BufferPostman) resetTimer() {
  194. if nil != p.timer && p.duration > 0 {
  195. if len(p.timer.C) > 0 {
  196. <-p.timer.C
  197. }
  198. p.timer.Reset(p.duration)
  199. p.isTimerStop = false
  200. }
  201. }