buffer_postman.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212
  1. package queue
  2. import (
  3. "sync"
  4. "time"
  5. )
  6. // 缓冲消息接口
  7. type IBufferItem interface {
  8. BufferID() interface{} // 去重用的键,包括后续消费消息,如果需要也可以根据这个键做散列处理
  9. Reduce(oldVal IBufferItem) IBufferItem // 接口实现类中实现此方法,以实现累加之类的多态的业务逻辑,当然最简单不做其他处理直接返回新的对象值自身也行
  10. }
  11. // 缓冲键值表
  12. type BufferMap struct {
  13. data map[interface{}]IBufferItem
  14. lock *sync.Mutex
  15. }
  16. // 新建缓冲键值表对象
  17. func NewBufferMap() *BufferMap {
  18. return &BufferMap{
  19. data: make(map[interface{}]IBufferItem),
  20. lock: new(sync.Mutex),
  21. }
  22. }
  23. // 置入键值
  24. func (m *BufferMap) Push(item IBufferItem) int {
  25. if nil == item {
  26. return 0
  27. }
  28. m.lock.Lock()
  29. defer m.lock.Unlock()
  30. oldVal := m.data[item.BufferID()]
  31. m.data[item.BufferID()] = item.Reduce(oldVal)
  32. return len(m.data)
  33. }
  34. // 读取键值
  35. func (m *BufferMap) Get(id interface{}) IBufferItem {
  36. if nil == id {
  37. return nil
  38. }
  39. m.lock.Lock()
  40. defer m.lock.Unlock()
  41. return m.data[id]
  42. }
  43. // 读取并移除键值
  44. func (m *BufferMap) Pop(id interface{}) IBufferItem {
  45. if nil == id {
  46. return nil
  47. }
  48. m.lock.Lock()
  49. defer m.lock.Unlock()
  50. res := m.data[id]
  51. delete(m.data, id)
  52. return res
  53. }
  54. // 读取全部键
  55. func (m *BufferMap) Keys() []interface{} {
  56. m.lock.Lock()
  57. defer m.lock.Unlock()
  58. if len(m.data) == 0 {
  59. return nil
  60. }
  61. var res []interface{}
  62. for k := range m.data {
  63. res = append(res, k)
  64. }
  65. return res
  66. }
  67. // 读取全部值
  68. func (m *BufferMap) Values() []IBufferItem {
  69. m.lock.Lock()
  70. defer m.lock.Unlock()
  71. if len(m.data) == 0 {
  72. return nil
  73. }
  74. var res []IBufferItem
  75. for _, v := range m.data {
  76. res = append(res, v)
  77. }
  78. return res
  79. }
  80. // 读取并移除全部键值
  81. func (m *BufferMap) PopAll() []IBufferItem {
  82. m.lock.Lock()
  83. defer m.lock.Unlock()
  84. if len(m.data) == 0 {
  85. return nil
  86. }
  87. var res []IBufferItem
  88. for _, v := range m.data {
  89. res = append(res, v)
  90. }
  91. m.data = make(map[interface{}]IBufferItem)
  92. return res
  93. }
  94. // 获取大小
  95. func (m *BufferMap) Size() int {
  96. m.lock.Lock()
  97. defer m.lock.Unlock()
  98. return len(m.data)
  99. }
  100. // 移除键值
  101. func (m *BufferMap) Remove(id interface{}) {
  102. if nil == id {
  103. return
  104. }
  105. m.lock.Lock()
  106. defer m.lock.Unlock()
  107. if len(m.data) == 0 {
  108. return
  109. }
  110. delete(m.data, id)
  111. }
  112. // 清空键值表
  113. func (m *BufferMap) Clear() {
  114. m.lock.Lock()
  115. defer m.lock.Unlock()
  116. if len(m.data) == 0 {
  117. return
  118. }
  119. m.data = make(map[interface{}]IBufferItem)
  120. }
  121. // 缓冲投递员
  122. type BufferPostman struct {
  123. limit int
  124. duration time.Duration
  125. buffer *BufferMap
  126. timer *time.Timer
  127. isTimerStop bool
  128. target chan interface{}
  129. }
  130. // 新建缓冲投递员对象
  131. func NewBufferPostman(limit int, duration time.Duration, target chan interface{}) *BufferPostman {
  132. p := &BufferPostman{
  133. limit: limit,
  134. duration: duration,
  135. buffer: NewBufferMap(),
  136. target: target,
  137. }
  138. if duration > 0 {
  139. p.timer = time.NewTimer(duration)
  140. go func() {
  141. for {
  142. select {
  143. case <-p.timer.C: // 超时
  144. p.isTimerStop = true
  145. p.deliver()
  146. }
  147. }
  148. }()
  149. }
  150. return p
  151. }
  152. // 置入消息
  153. func (p *BufferPostman) Push(item IBufferItem) {
  154. size := p.buffer.Push(item)
  155. if p.isTimerStop { // 唤醒定时器
  156. p.resetTimer()
  157. }
  158. if p.limit > 0 && size >= p.limit { // 超限
  159. p.deliver()
  160. }
  161. }
  162. // 投递消息
  163. func (p *BufferPostman) deliver() {
  164. data := p.buffer.PopAll()
  165. if nil != data && len(data) > 0 {
  166. // 仅供测试时用的日志记录
  167. //if eventType == 1 {
  168. // fmt.Printf("deliver %d messages when timeout\n", len(data))
  169. //} else if eventType == 2 {
  170. // fmt.Printf("deliver %d messages when full\n", len(data))
  171. //}
  172. // 将消息推进中转chan
  173. go func(p *BufferPostman, data []IBufferItem) {
  174. for i := 0; i < len(data); i++ {
  175. p.target <- data[i]
  176. }
  177. }(p, data)
  178. // 触发时有数据才会重置定时器
  179. p.resetTimer()
  180. }
  181. // 没数据,定时器就暂停了,等待新数据进入时再唤醒
  182. }
  183. // 重置定时器
  184. func (p *BufferPostman) resetTimer() {
  185. if nil != p.timer && p.duration > 0 {
  186. if len(p.timer.C) > 0 {
  187. <-p.timer.C
  188. }
  189. p.timer.Reset(p.duration)
  190. p.isTimerStop = false
  191. }
  192. }