package queue import ( "os" "os/signal" "syscall" ) // 工作对象封装 type ChanWorker struct { ID int // 工作对象编号 WorkerPool chan chan interface{} // 工作管道池,实例化时由调度器传入 JobChannel chan interface{} // 工作管道 quit chan bool // 退出消息 } func NewChanWorker(workerId, capacity int, workerPool chan chan interface{}) *ChanWorker { var jobChannel chan interface{} if capacity < 0 { jobChannel = make(chan interface{}) } else { jobChannel = make(chan interface{}, capacity) } return &ChanWorker{ ID: workerId, WorkerPool: workerPool, JobChannel: jobChannel, quit: make(chan bool), } } func (w *ChanWorker) Start(callback func(workerId int, msg interface{})) { go func(w *ChanWorker, callback func(workerId int, msg interface{})) { for { // 新工作管道加入工作管道池 w.WorkerPool <- w.JobChannel select { case msg := <-w.JobChannel: callback(w.ID, msg) case <-w.quit: return } } }(w, callback) go w.closeWait() } func (w *ChanWorker) Stop() { go func(w *ChanWorker) { w.quit <- true }(w) } func (w *ChanWorker) closeWait() { var c chan os.Signal c = make(chan os.Signal, 1) signal.Notify(c, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT) for { select { case <-c: w.Stop() } } } // 调度对象 type ChanDispatcher struct { MsgQueue chan interface{} // 消息输入管道 WorkerPool chan chan interface{} // 工作管道池 maxWorkers int // 最大工作对象数 capacity int // 工作管道消息缓冲大小 } func NewChanDispatcher(msgQueue chan interface{}, maxWorkers int) *ChanDispatcher { return &ChanDispatcher{ MsgQueue: msgQueue, WorkerPool: make(chan chan interface{}, maxWorkers), maxWorkers: maxWorkers, capacity: -1, } } func NewChanDispatcherWithCapacity(msgQueue chan interface{}, maxWorkers, capacity int) *ChanDispatcher { return &ChanDispatcher{ MsgQueue: msgQueue, WorkerPool: make(chan chan interface{}, maxWorkers), maxWorkers: maxWorkers, capacity: capacity, } } func (d *ChanDispatcher) Run(callback func(workerId int, msg interface{})) { for i := 0; i < d.maxWorkers; i++ { worker := NewChanWorker(i, d.capacity, d.WorkerPool) worker.Start(callback) } go d.dispatch() } func (d *ChanDispatcher) dispatch() { for { select { case msg := <-d.MsgQueue: go func(d *ChanDispatcher, msg interface{}) { // 从工作管道池中尝试取出一个空闲(未阻塞)的工作管道,无空闲工作管道时阻塞 jobChannel := <-d.WorkerPool // 将一条消息发送给当前工作管道 jobChannel <- msg }(d, msg) } } }