buffer_postman.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216
  1. package queue
  2. import (
  3. "fmt"
  4. "git.haoqitour.com/haoqi/go-common/utils"
  5. "sync"
  6. "time"
  7. )
  8. // 缓冲消息接口
  9. type IBufferItem interface {
  10. BufferID() interface{} // 去重用的键,包括后续消费消息,如果需要也可以根据这个键做散列处理
  11. Reduce(oldVal IBufferItem) IBufferItem // 接口实现类中实现此方法,以实现累加之类的多态的业务逻辑,当然最简单不做其他处理直接返回新的对象值自身也行
  12. }
  13. // 缓冲键值表
  14. type BufferMap struct {
  15. data map[interface{}]IBufferItem
  16. lock *sync.Mutex
  17. }
  18. // 新建缓冲键值表对象
  19. func NewBufferMap() *BufferMap {
  20. return &BufferMap{
  21. data: make(map[interface{}]IBufferItem),
  22. lock: new(sync.Mutex),
  23. }
  24. }
  25. // 置入键值
  26. func (m *BufferMap) Push(item IBufferItem) int {
  27. if nil == item {
  28. return 0
  29. }
  30. m.lock.Lock()
  31. defer m.lock.Unlock()
  32. oldVal := m.data[item.BufferID()]
  33. m.data[item.BufferID()] = item.Reduce(oldVal)
  34. return len(m.data)
  35. }
  36. // 读取键值
  37. func (m *BufferMap) Get(id interface{}) IBufferItem {
  38. if nil == id {
  39. return nil
  40. }
  41. m.lock.Lock()
  42. defer m.lock.Unlock()
  43. return m.data[id]
  44. }
  45. // 读取并移除键值
  46. func (m *BufferMap) Pop(id interface{}) IBufferItem {
  47. if nil == id {
  48. return nil
  49. }
  50. m.lock.Lock()
  51. defer m.lock.Unlock()
  52. res := m.data[id]
  53. delete(m.data, id)
  54. return res
  55. }
  56. // 读取全部键
  57. func (m *BufferMap) Keys() []interface{} {
  58. m.lock.Lock()
  59. defer m.lock.Unlock()
  60. if len(m.data) == 0 {
  61. return nil
  62. }
  63. var res []interface{}
  64. for k := range m.data {
  65. res = append(res, k)
  66. }
  67. return res
  68. }
  69. // 读取全部值
  70. func (m *BufferMap) Values() []IBufferItem {
  71. m.lock.Lock()
  72. defer m.lock.Unlock()
  73. if len(m.data) == 0 {
  74. return nil
  75. }
  76. var res []IBufferItem
  77. for _, v := range m.data {
  78. res = append(res, v)
  79. }
  80. return res
  81. }
  82. // 读取并移除全部键值
  83. func (m *BufferMap) PopAll() []IBufferItem {
  84. m.lock.Lock()
  85. defer m.lock.Unlock()
  86. if len(m.data) == 0 {
  87. return nil
  88. }
  89. var res []IBufferItem
  90. for _, v := range m.data {
  91. res = append(res, v)
  92. }
  93. m.data = make(map[interface{}]IBufferItem)
  94. return res
  95. }
  96. // 获取大小
  97. func (m *BufferMap) Size() int {
  98. m.lock.Lock()
  99. defer m.lock.Unlock()
  100. return len(m.data)
  101. }
  102. // 移除键值
  103. func (m *BufferMap) Remove(id interface{}) {
  104. if nil == id {
  105. return
  106. }
  107. m.lock.Lock()
  108. defer m.lock.Unlock()
  109. if len(m.data) == 0 {
  110. return
  111. }
  112. delete(m.data, id)
  113. }
  114. // 清空键值表
  115. func (m *BufferMap) Clear() {
  116. m.lock.Lock()
  117. defer m.lock.Unlock()
  118. if len(m.data) == 0 {
  119. return
  120. }
  121. m.data = make(map[interface{}]IBufferItem)
  122. }
  123. // 缓冲投递员
  124. type BufferPostman struct {
  125. limit int
  126. duration time.Duration
  127. Buffer *BufferMap
  128. timer *time.Timer
  129. isTimerStop bool
  130. target chan interface{}
  131. }
  132. // 新建缓冲投递员对象
  133. func NewBufferPostman(limit int, duration time.Duration, target chan interface{}) *BufferPostman {
  134. p := &BufferPostman{
  135. limit: limit,
  136. duration: duration,
  137. Buffer: NewBufferMap(),
  138. target: target,
  139. }
  140. if duration > 0 {
  141. p.timer = time.NewTimer(duration)
  142. go func(p *BufferPostman) {
  143. defer utils.DefaultGoroutineRecover(nil, `缓冲投递超时消息`)
  144. for {
  145. select {
  146. case <-p.timer.C: // 超时
  147. p.isTimerStop = true
  148. p.deliver(1)
  149. }
  150. }
  151. }(p)
  152. }
  153. return p
  154. }
  155. // 置入消息
  156. func (p *BufferPostman) Push(item IBufferItem) {
  157. size := p.Buffer.Push(item)
  158. if p.isTimerStop { // 唤醒定时器
  159. p.resetTimer()
  160. }
  161. if p.limit > 0 && size >= p.limit { // 超限
  162. p.deliver(2)
  163. }
  164. }
  165. // 投递消息
  166. func (p *BufferPostman) deliver(eventType int8) {
  167. data := p.Buffer.PopAll()
  168. if nil != data && len(data) > 0 {
  169. // 仅供测试时用的日志记录
  170. if eventType == 1 {
  171. fmt.Printf("deliver %d messages when timeout\n", len(data))
  172. } else if eventType == 2 {
  173. fmt.Printf("deliver %d messages when full\n", len(data))
  174. }
  175. // 将消息推进中转chan
  176. go func(p *BufferPostman, data []IBufferItem) {
  177. defer utils.DefaultGoroutineRecover(nil, `缓冲投递超限消息`)
  178. for i := 0; i < len(data); i++ {
  179. p.target <- data[i]
  180. }
  181. }(p, data)
  182. // 触发时有数据才会重置定时器
  183. p.resetTimer()
  184. }
  185. // 没数据,定时器就暂停了,等待新数据进入时再唤醒
  186. }
  187. // 重置定时器
  188. func (p *BufferPostman) resetTimer() {
  189. if nil != p.timer && p.duration > 0 {
  190. if len(p.timer.C) > 0 {
  191. <-p.timer.C
  192. }
  193. p.timer.Reset(p.duration)
  194. p.isTimerStop = false
  195. }
  196. }