marion %!s(int64=5) %!d(string=hai) anos
pai
achega
aa4643a207
Modificáronse 2 ficheiros con 188 adicións e 0 borrados
  1. 117 0
      utils/queue/chan_pool.go
  2. 71 0
      utils/queue/chan_pool_test.go

+ 117 - 0
utils/queue/chan_pool.go

@@ -0,0 +1,117 @@
+package queue
+
+import (
+	"os"
+	"os/signal"
+	"syscall"
+)
+
+// 工作对象封装
+type ChanWorker struct {
+	ID         int                   // 工作对象编号
+	WorkerPool chan chan interface{} // 工作管道池,实例化时由调度器传入
+	JobChannel chan interface{}      // 工作管道
+	quit       chan bool             // 退出消息
+}
+
+func NewChanWorker(workerId, capacity int, workerPool chan chan interface{}) *ChanWorker {
+	var jobChannel chan interface{}
+	if capacity < 0 {
+		jobChannel = make(chan interface{})
+	} else {
+		jobChannel = make(chan interface{}, capacity)
+	}
+
+	return &ChanWorker{
+		ID:         workerId,
+		WorkerPool: workerPool,
+		JobChannel: jobChannel,
+		quit:       make(chan bool),
+	}
+}
+
+func (w *ChanWorker) Start(callback func(workerId int, msg interface{})) {
+	go func(w *ChanWorker, callback func(workerId int, msg interface{})) {
+		for {
+			// 新工作管道加入工作管道池
+			w.WorkerPool <- w.JobChannel
+
+			select {
+			case msg := <-w.JobChannel:
+				callback(w.ID, msg)
+			case <-w.quit:
+				return
+			}
+		}
+	}(w, callback)
+
+	go w.closeWait()
+}
+
+func (w *ChanWorker) Stop() {
+	go func(w *ChanWorker) {
+		w.quit <- true
+	}(w)
+}
+
+func (w *ChanWorker) closeWait() {
+	var c chan os.Signal
+	c = make(chan os.Signal, 1)
+	signal.Notify(c, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT)
+	for {
+		select {
+		case <-c:
+			w.Stop()
+		}
+	}
+}
+
+// 调度对象
+type ChanDispatcher struct {
+	MsgQueue   chan interface{}      // 消息输入管道
+	WorkerPool chan chan interface{} // 工作管道池
+	maxWorkers int                   // 最大工作对象数
+	capacity   int                   // 工作管道消息缓冲大小
+}
+
+func NewChanDispatcher(msgQueue chan interface{}, maxWorkers int) *ChanDispatcher {
+	return &ChanDispatcher{
+		MsgQueue:   msgQueue,
+		WorkerPool: make(chan chan interface{}, maxWorkers),
+		maxWorkers: maxWorkers,
+		capacity:   -1,
+	}
+}
+
+func NewChanDispatcherWithCapacity(msgQueue chan interface{}, maxWorkers, capacity int) *ChanDispatcher {
+	return &ChanDispatcher{
+		MsgQueue:   msgQueue,
+		WorkerPool: make(chan chan interface{}, maxWorkers),
+		maxWorkers: maxWorkers,
+		capacity:   capacity,
+	}
+}
+
+func (d *ChanDispatcher) Run(callback func(workerId int, msg interface{})) {
+	for i := 0; i < d.maxWorkers; i++ {
+		worker := NewChanWorker(i, d.capacity, d.WorkerPool)
+		worker.Start(callback)
+	}
+
+	go d.dispatch()
+}
+
+func (d *ChanDispatcher) dispatch() {
+	for {
+		select {
+		case msg := <-d.MsgQueue:
+			go func(d *ChanDispatcher, msg interface{}) {
+				// 从工作管道池中尝试取出一个空闲(未阻塞)的工作管道,无空闲工作管道时阻塞
+				jobChannel := <-d.WorkerPool
+
+				// 将一条消息发送给当前工作管道
+				jobChannel <- msg
+			}(d, msg)
+		}
+	}
+}

+ 71 - 0
utils/queue/chan_pool_test.go

@@ -0,0 +1,71 @@
+package queue
+
+import (
+	"fmt"
+	"sync"
+	"testing"
+)
+
+type testMsg struct {
+	ID int
+}
+
+// "线程池"(golang管道池)单元测试
+func TestChanPool(t *testing.T) {
+	// 参数
+	maxWorkers := 4 // 最大工作管道数
+	msgCount := 100 // 消息数量
+
+	// 非主逻辑,计数器初始化
+	wg := sync.WaitGroup{} // 同步计数器
+	wg.Add(msgCount)
+	counter := NewCountMap() // 并发安全统计计数器
+
+	// 启动工作管道池调度器
+	msgQueue := make(chan interface{}, 0) // 输入队列
+	NewChanDispatcher(msgQueue, maxWorkers).Run(
+		//NewChanDispatcherWithCapacity(msgQueue, maxWorkers, 5).Run(
+		func(workerId int, msg interface{}) {
+			if n, ok := msg.(*testMsg); ok {
+				fmt.Printf("worker %d received msg %d\n", workerId, n.ID)
+				counter.Set(workerId, counter.Get(workerId)+1)
+				wg.Done()
+			}
+		},
+	)
+
+	// 发消息给输入队列
+	for i := 0; i < msgCount; i++ {
+		msgQueue <- &testMsg{ID: i}
+	}
+
+	// 非主逻辑,计数器打印
+	wg.Wait()
+	println("")
+	for k, v := range counter.Data {
+		fmt.Printf("worker %d received msg total count is %d\n", k, v)
+	}
+}
+
+type countMap struct {
+	Data map[int]int
+	Lock sync.Mutex
+}
+
+func NewCountMap() *countMap {
+	return &countMap{
+		Data: make(map[int]int),
+	}
+}
+
+func (d *countMap) Get(k int) int {
+	d.Lock.Lock()
+	defer d.Lock.Unlock()
+	return d.Data[k]
+}
+
+func (d *countMap) Set(k, v int) {
+	d.Lock.Lock()
+	defer d.Lock.Unlock()
+	d.Data[k] = v
+}