Преглед на файлове

本地去重消息缓冲

marion преди 5 години
родител
ревизия
4c6d4b504f
променени са 2 файла, в които са добавени 314 реда и са изтрити 0 реда
  1. 212 0
      utils/queue/buffer_postman.go
  2. 102 0
      utils/queue/buffer_postman_test.go

+ 212 - 0
utils/queue/buffer_postman.go

@@ -0,0 +1,212 @@
+package queue
+
+import (
+	"sync"
+	"time"
+)
+
+// 缓冲消息接口
+type IBufferItem interface {
+	BufferID() interface{}                 // 去重用的键,包括后续消费消息,如果需要也可以根据这个键做散列处理
+	Reduce(oldVal IBufferItem) IBufferItem // 接口实现类中实现此方法,以实现累加之类的多态的业务逻辑,当然最简单不做其他处理直接返回新的对象值自身也行
+}
+
+// 缓冲键值表
+type BufferMap struct {
+	data map[interface{}]IBufferItem
+	lock *sync.Mutex
+}
+
+// 新建缓冲键值表对象
+func NewBufferMap() *BufferMap {
+	return &BufferMap{
+		data: make(map[interface{}]IBufferItem),
+		lock: new(sync.Mutex),
+	}
+}
+
+// 置入键值
+func (m *BufferMap) Push(item IBufferItem) int {
+	if nil == item {
+		return 0
+	}
+	m.lock.Lock()
+	defer m.lock.Unlock()
+	oldVal := m.data[item.BufferID()]
+	m.data[item.BufferID()] = item.Reduce(oldVal)
+	return len(m.data)
+}
+
+// 读取键值
+func (m *BufferMap) Get(id interface{}) IBufferItem {
+	if nil == id {
+		return nil
+	}
+	m.lock.Lock()
+	defer m.lock.Unlock()
+	return m.data[id]
+}
+
+// 读取并移除键值
+func (m *BufferMap) Pop(id interface{}) IBufferItem {
+	if nil == id {
+		return nil
+	}
+	m.lock.Lock()
+	defer m.lock.Unlock()
+	res := m.data[id]
+	delete(m.data, id)
+	return res
+}
+
+// 读取全部键
+func (m *BufferMap) Keys() []interface{} {
+	m.lock.Lock()
+	defer m.lock.Unlock()
+	if len(m.data) == 0 {
+		return nil
+	}
+	var res []interface{}
+	for k := range m.data {
+		res = append(res, k)
+	}
+	return res
+}
+
+// 读取全部值
+func (m *BufferMap) Values() []IBufferItem {
+	m.lock.Lock()
+	defer m.lock.Unlock()
+	if len(m.data) == 0 {
+		return nil
+	}
+	var res []IBufferItem
+	for _, v := range m.data {
+		res = append(res, v)
+	}
+	return res
+}
+
+// 读取并移除全部键值
+func (m *BufferMap) PopAll() []IBufferItem {
+	m.lock.Lock()
+	defer m.lock.Unlock()
+	if len(m.data) == 0 {
+		return nil
+	}
+	var res []IBufferItem
+	for _, v := range m.data {
+		res = append(res, v)
+	}
+	m.data = make(map[interface{}]IBufferItem)
+	return res
+}
+
+// 获取大小
+func (m *BufferMap) Size() int {
+	m.lock.Lock()
+	defer m.lock.Unlock()
+	return len(m.data)
+}
+
+// 移除键值
+func (m *BufferMap) Remove(id interface{}) {
+	if nil == id {
+		return
+	}
+	m.lock.Lock()
+	defer m.lock.Unlock()
+	if len(m.data) == 0 {
+		return
+	}
+	delete(m.data, id)
+}
+
+// 清空键值表
+func (m *BufferMap) Clear() {
+	m.lock.Lock()
+	defer m.lock.Unlock()
+	if len(m.data) == 0 {
+		return
+	}
+	m.data = make(map[interface{}]IBufferItem)
+}
+
+// 缓冲投递员
+type BufferPostman struct {
+	limit       int
+	duration    time.Duration
+	buffer      *BufferMap
+	timer       *time.Timer
+	isTimerStop bool
+	target      chan interface{}
+}
+
+// 新建缓冲投递员对象
+func NewBufferPostman(limit int, duration time.Duration, target chan interface{}) *BufferPostman {
+	p := &BufferPostman{
+		limit:    limit,
+		duration: duration,
+		buffer:   NewBufferMap(),
+		target:   target,
+	}
+	if duration > 0 {
+		p.timer = time.NewTimer(duration)
+		go func() {
+			for {
+				select {
+				case <-p.timer.C: // 超时
+					p.isTimerStop = true
+					p.deliver()
+				}
+			}
+		}()
+	}
+	return p
+}
+
+// 置入消息
+func (p *BufferPostman) Push(item IBufferItem) {
+	size := p.buffer.Push(item)
+	if p.isTimerStop { // 唤醒定时器
+		p.resetTimer()
+	}
+	if p.limit > 0 && size >= p.limit { // 超限
+		p.deliver()
+	}
+}
+
+// 投递消息
+func (p *BufferPostman) deliver() {
+	data := p.buffer.PopAll()
+	if nil != data && len(data) > 0 {
+		// 仅供测试时用的日志记录
+		//if eventType == 1 {
+		//	fmt.Printf("deliver %d messages when timeout\n", len(data))
+		//} else if eventType == 2 {
+		//	fmt.Printf("deliver %d messages when full\n", len(data))
+		//}
+
+		// 将消息推进中转chan
+		go func(p *BufferPostman, data []IBufferItem) {
+			for i := 0; i < len(data); i++ {
+				p.target <- data[i]
+			}
+		}(p, data)
+
+		// 触发时有数据才会重置定时器
+		p.resetTimer()
+	}
+	// 没数据,定时器就暂停了,等待新数据进入时再唤醒
+}
+
+// 重置定时器
+func (p *BufferPostman) resetTimer() {
+	if nil != p.timer && p.duration > 0 {
+		if len(p.timer.C) > 0 {
+			<-p.timer.C
+		}
+		p.timer.Reset(p.duration)
+		p.isTimerStop = false
+	}
+}

+ 102 - 0
utils/queue/buffer_postman_test.go

@@ -0,0 +1,102 @@
+package queue
+
+import (
+	"fmt"
+	"git.haoqitour.com/haoqi/go-common/utils"
+	"testing"
+	"time"
+)
+
+var (
+	testQueue chan interface{}
+)
+
+func init() {
+	testQueue = make(chan interface{}, 500)
+}
+
+type TestMessage struct {
+	ID    utils.Long
+	Count int
+}
+
+func (m *TestMessage) BufferID() interface{} {
+	return m.ID
+}
+
+func (m *TestMessage) Reduce(oldVal IBufferItem) IBufferItem {
+	return m // 此行示例即不管旧值,直接返回新值,Push后的效果即新值直接替换旧值
+	// 或者实现累加之类的逻辑
+	//m.Count = oldVal.(*TestMessage).Count + m.Count
+	//return m
+}
+
+func TestBufferPostman_Push(t *testing.T) {
+	// 初始化缓冲投递员
+	bufPost := NewBufferPostman(5, 5*time.Second, testQueue)
+
+	// 启动并发的消息消费者chan池
+	NewChanDispatcher(testQueue, 4).Run(
+		func(workerId int, msg interface{}) {
+			if i, ok := msg.(IBufferItem); ok {
+				fmt.Printf("consumer %d recevie message %d\n", workerId, i.BufferID())
+			}
+		},
+	)
+
+	// 准备一个测试去重用的id列表
+	var ids []utils.Long
+	for i := 0; i < 17; i++ {
+		ids = append(ids, utils.NextId())
+	}
+
+	// 消息并发压进缓冲
+	// 整个数据流:消息加进缓冲的map -> 超时或超限时,取出全部消息推进中转chan -> 中转chan消费者处理消息
+	// 上述数据流入口,也可以是消费其它chan或消息中间件的消息,然后再加进map
+	// 中转chan消费者不一定要用并发chan池,例如,也可以根据id散列,启动固定的几个消息消费者 -- 这对向数据库写数据更加合适,不易因为并发造成死锁
+	for g := 0; g < 3; g++ {
+		go func() {
+			// 测试超时和超限机制
+			for i := 0; i < 50; i++ {
+				time.Sleep(100 * time.Millisecond)
+				var id utils.Long = 0
+				if nil != ids && len(ids) > 0 {
+					id = ids[i%len(ids)]
+				}
+				bufPost.Push(&TestMessage{
+					ID:    id,
+					Count: i,
+				})
+			}
+			// 当触发一次无数据时,超时定时器应该会停止
+		}()
+	}
+	// 测试超时定时器唤醒机制
+	go func() {
+		time.Sleep(30 * time.Second)
+		println(`30 seconds later`)
+		bufPost.Push(&TestMessage{
+			ID:    ids[0],
+			Count: 0,
+		})
+		time.Sleep(10 * time.Second)
+		for g := 0; g < 3; g++ {
+			go func() {
+				for i := 0; i < 500; i++ {
+					time.Sleep(100 * time.Millisecond)
+					var id utils.Long = 0
+					if nil != ids && len(ids) > 0 {
+						id = ids[i%len(ids)]
+					}
+					bufPost.Push(&TestMessage{
+						ID:    id,
+						Count: i,
+					})
+				}
+			}()
+		}
+	}()
+
+	quit := make(chan bool)
+	<-quit
+}