Browse Source

queue clear

marion 4 years ago
parent
commit
c24e8a4087
4 changed files with 140 additions and 28 deletions
  1. 24 17
      utils/queue/chan_pool.go
  2. 15 11
      utils/queue/chan_pool_test.go
  3. 41 0
      utils/queue/queue_clear.go
  4. 60 0
      utils/queue/queue_clear_test.go

+ 24 - 17
utils/queue/chan_pool.go

@@ -7,14 +7,15 @@ import (
 	"syscall"
 )
 
-// 工作对象封装
+// chan worker object
 type ChanWorker struct {
-	ID         int                   // 工作对象编号
-	WorkerPool chan chan interface{} // 工作管道池,实例化时由调度器传入
-	JobChannel chan interface{}      // 工作管道
-	quit       chan bool             // 退出消息
+	ID         int                   // worker ID
+	WorkerPool chan chan interface{} // refer to chan worker pool
+	JobChannel chan interface{}      // job message chan
+	quit       chan bool             // quit chan
 }
 
+// NewChanWorker() get new chan worker object instance
 func NewChanWorker(workerId int, workerPool chan chan interface{}) *ChanWorker {
 	jobChannel := make(chan interface{})
 
@@ -26,15 +27,16 @@ func NewChanWorker(workerId int, workerPool chan chan interface{}) *ChanWorker {
 	}
 }
 
+// Start() worker handling
 func (w *ChanWorker) Start(callback func(workerId int, msg interface{})) {
 	go func(w *ChanWorker, callback func(workerId int, msg interface{})) {
-		defer utils.DefaultGoroutineRecover(nil, `chan池工作对象消息处理`)
+		defer utils.DefaultGoroutineRecover(nil, `chan pool worker handling`)
 		for {
-			// 新工作管道或每次取用工作管道后,加入工作管道池
+			// job chan add to pool,when create or used
 			w.WorkerPool <- w.JobChannel
 
 			select {
-			case msg, ok := <-w.JobChannel: // 无消息时阻塞
+			case msg, ok := <-w.JobChannel: // block when empty
 				if ok {
 					callback(w.ID, msg)
 				}
@@ -45,14 +47,15 @@ func (w *ChanWorker) Start(callback func(workerId int, msg interface{})) {
 	}(w, callback)
 }
 
-// 调度对象
+// chan worker pool dispatcher object
 type ChanDispatcher struct {
-	MsgQueue   chan interface{}      // 消息输入管道
-	WorkerPool chan chan interface{} // 工作管道池
-	maxWorkers int                   // 最大工作对象数
-	workers    []*ChanWorker         // 工作对象列表
+	MsgQueue   chan interface{}      // message input chan
+	WorkerPool chan chan interface{} // worker chan pool
+	maxWorkers int                   // max concurrent chan count
+	workers    []*ChanWorker         // chan worker object list
 }
 
+// NewChanDispatcher() get new chan worker pool dispatcher object instance
 func NewChanDispatcher(msgQueue chan interface{}, maxWorkers int) *ChanDispatcher {
 	return &ChanDispatcher{
 		MsgQueue:   msgQueue,
@@ -61,6 +64,7 @@ func NewChanDispatcher(msgQueue chan interface{}, maxWorkers int) *ChanDispatche
 	}
 }
 
+// Run() start concurrent message customers base on chan worker pool dispatcher
 func (d *ChanDispatcher) Run(callback func(workerId int, msg interface{})) {
 	for i := 0; i < d.maxWorkers; i++ {
 		worker := NewChanWorker(i, d.WorkerPool)
@@ -71,18 +75,20 @@ func (d *ChanDispatcher) Run(callback func(workerId int, msg interface{})) {
 	d.dispatch()
 }
 
+// dispatch() dispatch handling
 func (d *ChanDispatcher) dispatch() {
 	go func(d *ChanDispatcher) {
-		defer utils.DefaultGoroutineRecover(nil, `chan池调度`)
+		defer utils.DefaultGoroutineRecover(nil, `chan pool dispatch handling`)
 		for {
 			select {
 			case msg, ok := <-d.MsgQueue:
 				if ok {
-					// 从工作管道池中尝试取出一个空闲的工作管道(每次取用工作管道会从池中取出去,消息处理完再放回池子,所以池子中的都是空闲的)
-					// 无空闲工作管道(池子中无消息)时阻塞
+					// try to get a free worker chan from pool
+					// (all worker chan in pool is free,because take it out when use,and put it back when used)
+					// block when all worker channels are busy(pool is empty)
 					jobChannel, isOpen := <-d.WorkerPool
 					if isOpen {
-						// 将一条消息发送给成功取出的工作管道
+						// send a message to job chan
 						jobChannel <- msg
 					}
 				}
@@ -91,6 +97,7 @@ func (d *ChanDispatcher) dispatch() {
 	}(d)
 }
 
+// closeWait() release dispatcher resource when signal close
 func (d *ChanDispatcher) closeWait() {
 	go func(d *ChanDispatcher) {
 		defer utils.DefaultGoroutineRecover(nil, `chan池关闭`)

+ 15 - 11
utils/queue/chan_pool_test.go

@@ -10,19 +10,19 @@ type testMsg struct {
 	ID int
 }
 
-// "线程池"(golang管道池)单元测试
+// golang chan pool unit test
 func TestChanPool(t *testing.T) {
-	// 参数
-	maxWorkers := 4  // 最大工作管道数
-	msgCount := 1000 // 消息数量
+	// parameters
+	maxWorkers := 4  // max worker chan number
+	msgCount := 1000 // message count
 
-	// 非主逻辑,计数器初始化
-	wg := sync.WaitGroup{} // 同步计数器
+	// init a concurrent counter map
+	wg := sync.WaitGroup{}
 	wg.Add(msgCount)
-	counter := NewCountMap() // 并发安全统计计数器
+	counter := NewCountMap()
 
-	// 启动工作管道池调度器
-	msgQueue := make(chan interface{}, 0) // 输入队列
+	msgQueue := make(chan interface{}, 0) // input chan
+	// message customer start
 	NewChanDispatcher(msgQueue, maxWorkers).Run(
 		func(workerId int, msg interface{}) {
 			if n, ok := msg.(*testMsg); ok {
@@ -33,12 +33,12 @@ func TestChanPool(t *testing.T) {
 		},
 	)
 
-	// 发消息给输入队列
+	// send some messages
 	for i := 0; i < msgCount; i++ {
 		msgQueue <- &testMsg{ID: i}
 	}
 
-	// 非主逻辑,计数器打印
+	// result print
 	wg.Wait()
 	println("")
 	for k, v := range counter.Data {
@@ -46,23 +46,27 @@ func TestChanPool(t *testing.T) {
 	}
 }
 
+// concurrent counter map
 type countMap struct {
 	Data map[int]int
 	Lock sync.Mutex
 }
 
+// NewCountMap() get new concurrent counter map instance
 func NewCountMap() *countMap {
 	return &countMap{
 		Data: make(map[int]int),
 	}
 }
 
+// Get() get counter value from map
 func (d *countMap) Get(k int) int {
 	d.Lock.Lock()
 	defer d.Lock.Unlock()
 	return d.Data[k]
 }
 
+// Set() set value to counter map
 func (d *countMap) Set(k, v int) {
 	d.Lock.Lock()
 	defer d.Lock.Unlock()

+ 41 - 0
utils/queue/queue_clear.go

@@ -0,0 +1,41 @@
+package queue
+
+import (
+	"sync"
+	"time"
+)
+
+// buffered golang chan members release
+type Clear struct {
+	flag bool
+	lock sync.RWMutex
+}
+
+// Clean() begin and wait for remaining chan members were released
+func (i *Clear) Clean(queue chan interface{}) {
+	i.set(true)
+
+	for {
+		if len(queue) == 0 {
+			time.Sleep(1 * time.Millisecond) // set a little gap between Clean() and clear.Working() off
+			i.set(false)
+			break
+		}
+	}
+}
+
+// set() set the Clear working flag
+func (i *Clear) set(val bool) {
+	i.lock.Lock()
+	defer i.lock.Unlock()
+
+	i.flag = val
+}
+
+// Working() return whether the Clear is working
+func (i *Clear) Working() bool {
+	i.lock.RLock()
+	defer i.lock.RUnlock()
+
+	return i.flag
+}

+ 60 - 0
utils/queue/queue_clear_test.go

@@ -0,0 +1,60 @@
+package queue
+
+import (
+	"fmt"
+	"sync"
+	"testing"
+)
+
+// buffered golang chan members release unit test
+func TestQueueClear(t *testing.T) {
+	counter := &counter{} // init a concurrent counter
+
+	clear := &Clear{} // init clear
+
+	msgQueue := make(chan interface{}, 1000) // buffered chan
+	// message customer start
+	NewChanDispatcher(msgQueue, 4).Run(
+		func(workerId int, msg interface{}) {
+			if clear.Working() {
+				println("ignored", counter.Add()-1)
+			} else {
+				if n, ok := msg.(*testMsg); ok {
+					fmt.Printf("worker %d received msg %d\n", workerId, n.ID)
+				}
+			}
+		},
+	)
+
+	// send some messages
+	for i := 0; i < 50; i++ {
+		msgQueue <- &testMsg{ID: i}
+	}
+
+	// but "clean" immediately
+	clear.Clean(msgQueue) // sync block loop
+
+	// wait clean over then send another messages
+	for i := 0; i < 20; i++ {
+		msgQueue <- &testMsg{ID: i}
+	}
+
+	quit := make(chan bool)
+	<-quit
+}
+
+// concurrent counter
+type counter struct {
+	Number int
+	Lock   sync.Mutex
+}
+
+// Add() add counter and return new value
+func (d *counter) Add() int {
+	d.Lock.Lock()
+	defer d.Lock.Unlock()
+
+	d.Number++
+
+	return d.Number
+}