package queue

import (
	"fmt"
	"git.aionnect.com/aionnect/go-common/utils"
	"testing"
	"time"
)

var (
	testQueue chan interface{}
)

func init() {
	testQueue = make(chan interface{}, 500)
}

type TestMessage struct {
	ID    utils.Long
	Count int
}

func (m *TestMessage) BufferID() interface{} {
	return m.ID
}

func (m *TestMessage) Reduce(oldVal IBufferItem) IBufferItem {
	return m // 此行示例即不管旧值,直接返回新值,Push后的效果即新值直接替换旧值
	// 或者实现累加之类的逻辑
	//m.Count = oldVal.(*TestMessage).Count + m.Count
	//return m
}

func TestBufferPostman_Push(t *testing.T) {
	// 初始化缓冲投递员
	bufPost := NewBufferPostman(5, 5*time.Second, testQueue)

	// 启动并发的消息消费者chan池
	NewChanDispatcher(testQueue, 4).Run(
		func(workerId int, msg interface{}) {
			if i, ok := msg.(IBufferItem); ok {
				fmt.Printf("consumer %d recevie message %d\n", workerId, i.BufferID())
			}
		},
	)

	// 准备一个测试去重用的id列表
	var ids []utils.Long
	for i := 0; i < 17; i++ {
		ids = append(ids, utils.Long(i))
	}

	// 消息并发压进缓冲
	// 整个数据流:消息加进缓冲的map -> 超时或超限时,取出全部消息推进中转chan -> 中转chan消费者处理消息
	// 上述数据流入口,也可以是消费其它chan或消息中间件的消息,然后再加进map
	// 中转chan消费者不一定要用并发chan池,例如,也可以根据id散列,启动固定的几个消息消费者 -- 这对向数据库写数据更加合适,不易因为并发造成死锁
	for g := 0; g < 3; g++ {
		go func() {
			// 测试超时和超限机制
			for i := 0; i < 50; i++ {
				time.Sleep(100 * time.Millisecond)
				var id utils.Long = 0
				if nil != ids && len(ids) > 0 {
					id = ids[i%len(ids)]
				}
				bufPost.Push(&TestMessage{
					ID:    id,
					Count: i,
				})
			}
			// 当触发一次无数据时,超时定时器应该会停止
		}()
	}
	// 测试超时定时器唤醒机制
	go func() {
		time.Sleep(30 * time.Second)
		println(`30 seconds later`)
		bufPost.Push(&TestMessage{
			ID:    ids[0],
			Count: 0,
		})
		time.Sleep(10 * time.Second)
		for g := 0; g < 3; g++ {
			go func() {
				for i := 0; i < 500; i++ {
					time.Sleep(100 * time.Millisecond)
					var id utils.Long = 0
					if nil != ids && len(ids) > 0 {
						id = ids[i%len(ids)]
					}
					bufPost.Push(&TestMessage{
						ID:    id,
						Count: i,
					})
				}
			}()
		}
	}()

	quit := make(chan bool)
	<-quit
}