Browse Source

chan pool性能优化

marion 4 years ago
parent
commit
97e708e007
1 changed files with 27 additions and 23 deletions
  1. 27 23
      utils/queue/chan_pool.go

+ 27 - 23
utils/queue/chan_pool.go

@@ -43,28 +43,6 @@ func (w *ChanWorker) Start(callback func(workerId int, msg interface{})) {
 			}
 		}
 	}(w, callback)
-
-	w.closeWait()
-}
-
-func (w *ChanWorker) closeWait() {
-	go func(w *ChanWorker) {
-		defer utils.DefaultGoroutineRecover(nil, `chan池关闭`)
-		var c chan os.Signal
-		var s os.Signal
-		c = make(chan os.Signal, 1)
-		signal.Notify(c, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGKILL)
-		for {
-			s = <-c
-			switch s {
-			case syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGKILL:
-				w.quit <- true
-				return
-			default:
-				return
-			}
-		}
-	}(w)
 }
 
 // 调度对象
@@ -72,6 +50,7 @@ type ChanDispatcher struct {
 	MsgQueue   chan interface{}      // 消息输入管道
 	WorkerPool chan chan interface{} // 工作管道池
 	maxWorkers int                   // 最大工作对象数
+	workers    []*ChanWorker         // 工作对象列表
 }
 
 func NewChanDispatcher(msgQueue chan interface{}, maxWorkers int) *ChanDispatcher {
@@ -85,9 +64,10 @@ func NewChanDispatcher(msgQueue chan interface{}, maxWorkers int) *ChanDispatche
 func (d *ChanDispatcher) Run(callback func(workerId int, msg interface{})) {
 	for i := 0; i < d.maxWorkers; i++ {
 		worker := NewChanWorker(i, d.WorkerPool)
+		d.workers = append(d.workers, worker)
 		worker.Start(callback)
 	}
-
+	d.closeWait()
 	d.dispatch()
 }
 
@@ -110,3 +90,27 @@ func (d *ChanDispatcher) dispatch() {
 		}
 	}(d)
 }
+
+func (d *ChanDispatcher) closeWait() {
+	go func(d *ChanDispatcher) {
+		defer utils.DefaultGoroutineRecover(nil, `chan池关闭`)
+		var c chan os.Signal
+		var s os.Signal
+		c = make(chan os.Signal, 1)
+		signal.Notify(c, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGKILL)
+		for {
+			s = <-c
+			switch s {
+			case syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGKILL:
+				if nil != d.workers && len(d.workers) > 0 {
+					for i := 0; i < len(d.workers); i++ {
+						d.workers[i].quit <- true
+					}
+				}
+				return
+			default:
+				return
+			}
+		}
+	}(d)
+}