package queue import ( "git.aionnect.com/aionnect/go-common/utils" "os" "os/signal" "syscall" ) // chan worker object type ChanWorker struct { ID int // worker ID WorkerPool chan chan interface{} // refer to chan worker pool JobChannel chan interface{} // job message chan quit chan bool // quit chan } // NewChanWorker() get new chan worker object instance func NewChanWorker(workerId int, workerPool chan chan interface{}) *ChanWorker { jobChannel := make(chan interface{}) return &ChanWorker{ ID: workerId, WorkerPool: workerPool, JobChannel: jobChannel, quit: make(chan bool), } } // Start() worker handling func (w *ChanWorker) Start(callback func(workerId int, msg interface{})) { go func(w *ChanWorker, callback func(workerId int, msg interface{})) { defer utils.DefaultGoroutineRecover(nil, `chan pool worker handling`) for { // job chan add to pool,when create or used w.WorkerPool <- w.JobChannel select { case msg, ok := <-w.JobChannel: // block when empty if ok { callback(w.ID, msg) } case <-w.quit: return } } }(w, callback) } // chan worker pool dispatcher object type ChanDispatcher struct { MsgQueue chan interface{} // message input chan WorkerPool chan chan interface{} // worker chan pool maxWorkers int // max concurrent chan count workers []*ChanWorker // chan worker object list } // NewChanDispatcher() get new chan worker pool dispatcher object instance func NewChanDispatcher(msgQueue chan interface{}, maxWorkers int) *ChanDispatcher { return &ChanDispatcher{ MsgQueue: msgQueue, WorkerPool: make(chan chan interface{}, maxWorkers), maxWorkers: maxWorkers, } } // Run() start concurrent message customers base on chan worker pool dispatcher func (d *ChanDispatcher) Run(callback func(workerId int, msg interface{})) { for i := 0; i < d.maxWorkers; i++ { worker := NewChanWorker(i, d.WorkerPool) d.workers = append(d.workers, worker) worker.Start(callback) } d.closeWait() d.dispatch() } // dispatch() dispatch handling func (d *ChanDispatcher) dispatch() { go func(d *ChanDispatcher) { defer utils.DefaultGoroutineRecover(nil, `chan pool dispatch handling`) for { select { case msg, ok := <-d.MsgQueue: if ok { // try to get a free worker chan from pool // (all worker chan in pool is free,because take it out when use,and put it back when used) // block when all worker channels are busy(pool is empty) jobChannel, isOpen := <-d.WorkerPool if isOpen { // send a message to job chan jobChannel <- msg } } } } }(d) } // closeWait() release dispatcher resource when signal close func (d *ChanDispatcher) closeWait() { go func(d *ChanDispatcher) { defer utils.DefaultGoroutineRecover(nil, `chan池关闭`) var c chan os.Signal var s os.Signal c = make(chan os.Signal, 1) signal.Notify(c, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGKILL) for { s = <-c switch s { case syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGKILL: if nil != d.workers && len(d.workers) > 0 { for i := 0; i < len(d.workers); i++ { d.workers[i].quit <- true } } return default: return } } }(d) }