|
@@ -53,6 +53,13 @@ func (w *ChanWorker) Start(callback func(workerId int, msg interface{})) {
|
|
|
w.closeWait()
|
|
|
}
|
|
|
|
|
|
+func (w *ChanWorker) Stop() {
|
|
|
+ go func(w *ChanWorker) {
|
|
|
+ defer utils.DefaultGoroutineRecover(w.log, `chan池工作对象关闭`)
|
|
|
+ w.quit <- true
|
|
|
+ }(w)
|
|
|
+}
|
|
|
+
|
|
|
func (w *ChanWorker) closeWait() {
|
|
|
go func(w *ChanWorker) {
|
|
|
defer utils.DefaultGoroutineRecover(w.log, `chan池关闭`)
|
|
@@ -62,7 +69,7 @@ func (w *ChanWorker) closeWait() {
|
|
|
for {
|
|
|
select {
|
|
|
case <-c:
|
|
|
- w.quit <- true
|
|
|
+ w.Stop()
|
|
|
}
|
|
|
}
|
|
|
}(w)
|
|
@@ -112,11 +119,13 @@ func (d *ChanDispatcher) dispatch() {
|
|
|
for {
|
|
|
select {
|
|
|
case msg := <-d.MsgQueue:
|
|
|
- // 从工作管道池中尝试取出一个空闲(未阻塞)的工作管道,无空闲工作管道时阻塞
|
|
|
- jobChannel := <-d.WorkerPool
|
|
|
+ go func(d *ChanDispatcher, msg interface{}) {
|
|
|
+ // 从工作管道池中尝试取出一个空闲(未阻塞)的工作管道,无空闲工作管道时阻塞
|
|
|
+ jobChannel := <-d.WorkerPool
|
|
|
|
|
|
- // 将一条消息发送给当前工作管道
|
|
|
- jobChannel <- msg
|
|
|
+ // 将一条消息发送给当前工作管道
|
|
|
+ jobChannel <- msg
|
|
|
+ }(d, msg)
|
|
|
}
|
|
|
}
|
|
|
}(d)
|