package queue import ( "git.aionnect.com/aionnect/go-common/utils" "os" "os/signal" "syscall" ) // 工作对象封装 type ChanWorker struct { ID int // 工作对象编号 WorkerPool chan chan interface{} // 工作管道池,实例化时由调度器传入 JobChannel chan interface{} // 工作管道 quit chan bool // 退出消息 } func NewChanWorker(workerId int, workerPool chan chan interface{}) *ChanWorker { jobChannel := make(chan interface{}) return &ChanWorker{ ID: workerId, WorkerPool: workerPool, JobChannel: jobChannel, quit: make(chan bool), } } func (w *ChanWorker) Start(callback func(workerId int, msg interface{})) { go func(w *ChanWorker, callback func(workerId int, msg interface{})) { defer utils.DefaultGoroutineRecover(nil, `chan池工作对象消息处理`) for { // 新工作管道或每次取用工作管道后,加入工作管道池 w.WorkerPool <- w.JobChannel select { case msg, ok := <-w.JobChannel: // 无消息时阻塞 if ok { callback(w.ID, msg) } case <-w.quit: return } } }(w, callback) w.closeWait() } func (w *ChanWorker) closeWait() { go func(w *ChanWorker) { defer utils.DefaultGoroutineRecover(nil, `chan池关闭`) var c chan os.Signal var s os.Signal c = make(chan os.Signal, 1) signal.Notify(c, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGKILL) for { s = <-c switch s { case syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGKILL: w.quit <- true return default: return } } }(w) } // 调度对象 type ChanDispatcher struct { MsgQueue chan interface{} // 消息输入管道 WorkerPool chan chan interface{} // 工作管道池 maxWorkers int // 最大工作对象数 } func NewChanDispatcher(msgQueue chan interface{}, maxWorkers int) *ChanDispatcher { return &ChanDispatcher{ MsgQueue: msgQueue, WorkerPool: make(chan chan interface{}, maxWorkers), maxWorkers: maxWorkers, } } func (d *ChanDispatcher) Run(callback func(workerId int, msg interface{})) { for i := 0; i < d.maxWorkers; i++ { worker := NewChanWorker(i, d.WorkerPool) worker.Start(callback) } d.dispatch() } func (d *ChanDispatcher) dispatch() { go func(d *ChanDispatcher) { defer utils.DefaultGoroutineRecover(nil, `chan池调度`) for { select { case msg, ok := <-d.MsgQueue: if ok { // 从工作管道池中尝试取出一个空闲的工作管道(每次取用工作管道会从池中取出去,消息处理完再放回池子,所以池子中的都是空闲的) // 无空闲工作管道(池子中无消息)时阻塞 jobChannel, isOpen := <-d.WorkerPool if isOpen { // 将一条消息发送给成功取出的工作管道 jobChannel <- msg } } } } }(d) }