123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123 |
- package queue
- import (
- "git.aionnect.com/aionnect/go-common/utils"
- "os"
- "os/signal"
- "syscall"
- )
- // chan worker object
- type ChanWorker struct {
- ID int // worker ID
- WorkerPool chan chan interface{} // refer to chan worker pool
- JobChannel chan interface{} // job message chan
- quit chan bool // quit chan
- }
- // NewChanWorker() get new chan worker object instance
- func NewChanWorker(workerId int, workerPool chan chan interface{}) *ChanWorker {
- jobChannel := make(chan interface{})
- return &ChanWorker{
- ID: workerId,
- WorkerPool: workerPool,
- JobChannel: jobChannel,
- quit: make(chan bool),
- }
- }
- // Start() worker handling
- func (w *ChanWorker) Start(callback func(workerId int, msg interface{})) {
- go func(w *ChanWorker, callback func(workerId int, msg interface{})) {
- defer utils.DefaultGoroutineRecover(nil, `chan pool worker handling`)
- for {
- // job chan add to pool,when create or used
- w.WorkerPool <- w.JobChannel
- select {
- case msg, ok := <-w.JobChannel: // block when empty
- if ok {
- callback(w.ID, msg)
- }
- case <-w.quit:
- return
- }
- }
- }(w, callback)
- }
- // chan worker pool dispatcher object
- type ChanDispatcher struct {
- MsgQueue chan interface{} // message input chan
- WorkerPool chan chan interface{} // worker chan pool
- maxWorkers int // max concurrent chan count
- workers []*ChanWorker // chan worker object list
- }
- // NewChanDispatcher() get new chan worker pool dispatcher object instance
- func NewChanDispatcher(msgQueue chan interface{}, maxWorkers int) *ChanDispatcher {
- return &ChanDispatcher{
- MsgQueue: msgQueue,
- WorkerPool: make(chan chan interface{}, maxWorkers),
- maxWorkers: maxWorkers,
- }
- }
- // Run() start concurrent message customers base on chan worker pool dispatcher
- func (d *ChanDispatcher) Run(callback func(workerId int, msg interface{})) {
- for i := 0; i < d.maxWorkers; i++ {
- worker := NewChanWorker(i, d.WorkerPool)
- d.workers = append(d.workers, worker)
- worker.Start(callback)
- }
- d.closeWait()
- d.dispatch()
- }
- // dispatch() dispatch handling
- func (d *ChanDispatcher) dispatch() {
- go func(d *ChanDispatcher) {
- defer utils.DefaultGoroutineRecover(nil, `chan pool dispatch handling`)
- for {
- select {
- case msg, ok := <-d.MsgQueue:
- if ok {
- // try to get a free worker chan from pool
- // (all worker chan in pool is free,because take it out when use,and put it back when used)
- // block when all worker channels are busy(pool is empty)
- jobChannel, isOpen := <-d.WorkerPool
- if isOpen {
- // send a message to job chan
- jobChannel <- msg
- }
- }
- }
- }
- }(d)
- }
- // closeWait() release dispatcher resource when signal close
- func (d *ChanDispatcher) closeWait() {
- go func(d *ChanDispatcher) {
- defer utils.DefaultGoroutineRecover(nil, `chan池关闭`)
- var c chan os.Signal
- var s os.Signal
- c = make(chan os.Signal, 1)
- signal.Notify(c, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGKILL)
- for {
- s = <-c
- switch s {
- case syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGKILL:
- if nil != d.workers && len(d.workers) > 0 {
- for i := 0; i < len(d.workers); i++ {
- d.workers[i].quit <- true
- }
- }
- return
- default:
- return
- }
- }
- }(d)
- }
|