marion 5 tahun lalu
induk
melakukan
9cba78b815
1 mengubah file dengan 11 tambahan dan 7 penghapusan
  1. 11 7
      utils/queue/chan_pool.go

+ 11 - 7
utils/queue/chan_pool.go

@@ -39,8 +39,10 @@ func (w *ChanWorker) Start(callback func(workerId int, msg interface{})) {
 		defer utils.DefaultGoroutineRecover(nil, `chan池工作对象消息处理`)
 		for {
 			select {
-			case msg := <-w.JobChannel:
-				callback(w.ID, msg)
+			case msg, ok := <-w.JobChannel:
+				if ok {
+					callback(w.ID, msg)
+				}
 			case <-w.quit:
 				return
 			}
@@ -110,12 +112,14 @@ func (d *ChanDispatcher) dispatch() {
 		defer utils.DefaultGoroutineRecover(nil, `chan池调度`)
 		for {
 			select {
-			case msg := <-d.MsgQueue:
-				// 从工作管道池中尝试取出一个空闲(未阻塞)的工作管道,无空闲工作管道时阻塞
-				jobChannel := <-d.WorkerPool
+			case msg, ok := <-d.MsgQueue:
+				if ok {
+					// 从工作管道池中尝试取出一个空闲(未阻塞)的工作管道,无空闲工作管道时阻塞
+					jobChannel := <-d.WorkerPool
 
-				// 将一条消息发送给当前工作管道
-				jobChannel <- msg
+					// 将一条消息发送给当前工作管道
+					jobChannel <- msg
+				}
 			}
 		}
 	}(d)