chan_pool.go 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
  1. package queue
  2. import (
  3. "git.aionnect.com/aionnect/go-common/utils"
  4. "os"
  5. "os/signal"
  6. "syscall"
  7. )
  8. // chan worker object
  9. type ChanWorker struct {
  10. ID int // worker ID
  11. WorkerPool chan chan interface{} // refer to chan worker pool
  12. JobChannel chan interface{} // job message chan
  13. quit chan bool // quit chan
  14. }
  15. // NewChanWorker() get new chan worker object instance
  16. func NewChanWorker(workerId int, workerPool chan chan interface{}) *ChanWorker {
  17. jobChannel := make(chan interface{})
  18. return &ChanWorker{
  19. ID: workerId,
  20. WorkerPool: workerPool,
  21. JobChannel: jobChannel,
  22. quit: make(chan bool),
  23. }
  24. }
  25. // Start() worker handling
  26. func (w *ChanWorker) Start(callback func(workerId int, msg interface{})) {
  27. go func(w *ChanWorker, callback func(workerId int, msg interface{})) {
  28. defer utils.DefaultGoroutineRecover(nil, `chan pool worker handling`)
  29. for {
  30. // job chan add to pool,when create or used
  31. w.WorkerPool <- w.JobChannel
  32. select {
  33. case msg, ok := <-w.JobChannel: // block when empty
  34. if ok {
  35. callback(w.ID, msg)
  36. }
  37. case <-w.quit:
  38. return
  39. }
  40. }
  41. }(w, callback)
  42. }
  43. // chan worker pool dispatcher object
  44. type ChanDispatcher struct {
  45. MsgQueue chan interface{} // message input chan
  46. WorkerPool chan chan interface{} // worker chan pool
  47. maxWorkers int // max concurrent chan count
  48. workers []*ChanWorker // chan worker object list
  49. }
  50. // NewChanDispatcher() get new chan worker pool dispatcher object instance
  51. func NewChanDispatcher(msgQueue chan interface{}, maxWorkers int) *ChanDispatcher {
  52. return &ChanDispatcher{
  53. MsgQueue: msgQueue,
  54. WorkerPool: make(chan chan interface{}, maxWorkers),
  55. maxWorkers: maxWorkers,
  56. }
  57. }
  58. // Run() start concurrent message customers base on chan worker pool dispatcher
  59. func (d *ChanDispatcher) Run(callback func(workerId int, msg interface{})) {
  60. for i := 0; i < d.maxWorkers; i++ {
  61. worker := NewChanWorker(i, d.WorkerPool)
  62. d.workers = append(d.workers, worker)
  63. worker.Start(callback)
  64. }
  65. d.closeWait()
  66. d.dispatch()
  67. }
  68. // dispatch() dispatch handling
  69. func (d *ChanDispatcher) dispatch() {
  70. go func(d *ChanDispatcher) {
  71. defer utils.DefaultGoroutineRecover(nil, `chan pool dispatch handling`)
  72. for {
  73. select {
  74. case msg, ok := <-d.MsgQueue:
  75. if ok {
  76. // try to get a free worker chan from pool
  77. // (all worker chan in pool is free,because take it out when use,and put it back when used)
  78. // block when all worker channels are busy(pool is empty)
  79. jobChannel, isOpen := <-d.WorkerPool
  80. if isOpen {
  81. // send a message to job chan
  82. jobChannel <- msg
  83. }
  84. }
  85. }
  86. }
  87. }(d)
  88. }
  89. // closeWait() release dispatcher resource when signal close
  90. func (d *ChanDispatcher) closeWait() {
  91. go func(d *ChanDispatcher) {
  92. defer utils.DefaultGoroutineRecover(nil, `chan池关闭`)
  93. var c chan os.Signal
  94. var s os.Signal
  95. c = make(chan os.Signal, 1)
  96. signal.Notify(c, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGKILL)
  97. for {
  98. s = <-c
  99. switch s {
  100. case syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGKILL:
  101. if nil != d.workers && len(d.workers) > 0 {
  102. for i := 0; i < len(d.workers); i++ {
  103. d.workers[i].quit <- true
  104. }
  105. }
  106. return
  107. default:
  108. return
  109. }
  110. }
  111. }(d)
  112. }