marion 5 years ago
parent
commit
cc4914e31d
1 changed files with 6 additions and 21 deletions
  1. 6 21
      utils/queue/chan_pool.go

+ 6 - 21
utils/queue/chan_pool.go

@@ -15,13 +15,8 @@ type ChanWorker struct {
 	quit       chan bool             // 退出消息
 }
 
-func NewChanWorker(workerId, capacity int, workerPool chan chan interface{}) *ChanWorker {
-	var jobChannel chan interface{}
-	if capacity < 0 {
-		jobChannel = make(chan interface{})
-	} else {
-		jobChannel = make(chan interface{}, capacity)
-	}
+func NewChanWorker(workerId int, workerPool chan chan interface{}) *ChanWorker {
+	jobChannel := make(chan interface{})
 
 	return &ChanWorker{
 		ID:         workerId,
@@ -39,7 +34,7 @@ func (w *ChanWorker) Start(callback func(workerId int, msg interface{})) {
 			w.WorkerPool <- w.JobChannel
 
 			select {
-			case msg, ok := <-w.JobChannel:
+			case msg, ok := <-w.JobChannel: // 无消息时阻塞
 				if ok {
 					callback(w.ID, msg)
 				}
@@ -77,7 +72,6 @@ type ChanDispatcher struct {
 	MsgQueue   chan interface{}      // 消息输入管道
 	WorkerPool chan chan interface{} // 工作管道池
 	maxWorkers int                   // 最大工作对象数
-	capacity   int                   // 工作管道消息缓冲大小
 }
 
 func NewChanDispatcher(msgQueue chan interface{}, maxWorkers int) *ChanDispatcher {
@@ -85,22 +79,12 @@ func NewChanDispatcher(msgQueue chan interface{}, maxWorkers int) *ChanDispatche
 		MsgQueue:   msgQueue,
 		WorkerPool: make(chan chan interface{}, maxWorkers),
 		maxWorkers: maxWorkers,
-		capacity:   -1,
-	}
-}
-
-func NewChanDispatcherWithCapacity(msgQueue chan interface{}, maxWorkers, capacity int) *ChanDispatcher {
-	return &ChanDispatcher{
-		MsgQueue:   msgQueue,
-		WorkerPool: make(chan chan interface{}, maxWorkers),
-		maxWorkers: maxWorkers,
-		capacity:   capacity,
 	}
 }
 
 func (d *ChanDispatcher) Run(callback func(workerId int, msg interface{})) {
 	for i := 0; i < d.maxWorkers; i++ {
-		worker := NewChanWorker(i, d.capacity, d.WorkerPool)
+		worker := NewChanWorker(i, d.WorkerPool)
 		worker.Start(callback)
 	}
 
@@ -114,7 +98,8 @@ func (d *ChanDispatcher) dispatch() {
 			select {
 			case msg, ok := <-d.MsgQueue:
 				if ok {
-					// 从工作管道池中尝试取出一个空闲的工作管道(每次取用工作管道会从池中取出去,消息处理完再放回池子,所以池子中的都是空闲的),无空闲工作管道(池子中无消息)时阻塞
+					// 从工作管道池中尝试取出一个空闲的工作管道(每次取用工作管道会从池中取出去,消息处理完再放回池子,所以池子中的都是空闲的)
+					// 无空闲工作管道(池子中无消息)时阻塞
 					jobChannel, isOpen := <-d.WorkerPool
 					if isOpen {
 						// 将一条消息发送给成功取出的工作管道