chan_pool.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116
  1. package queue
  2. import (
  3. "git.aionnect.com/aionnect/go-common/utils"
  4. "os"
  5. "os/signal"
  6. "syscall"
  7. )
  8. // 工作对象封装
  9. type ChanWorker struct {
  10. ID int // 工作对象编号
  11. WorkerPool chan chan interface{} // 工作管道池,实例化时由调度器传入
  12. JobChannel chan interface{} // 工作管道
  13. quit chan bool // 退出消息
  14. }
  15. func NewChanWorker(workerId int, workerPool chan chan interface{}) *ChanWorker {
  16. jobChannel := make(chan interface{})
  17. return &ChanWorker{
  18. ID: workerId,
  19. WorkerPool: workerPool,
  20. JobChannel: jobChannel,
  21. quit: make(chan bool),
  22. }
  23. }
  24. func (w *ChanWorker) Start(callback func(workerId int, msg interface{})) {
  25. go func(w *ChanWorker, callback func(workerId int, msg interface{})) {
  26. defer utils.DefaultGoroutineRecover(nil, `chan池工作对象消息处理`)
  27. for {
  28. // 新工作管道或每次取用工作管道后,加入工作管道池
  29. w.WorkerPool <- w.JobChannel
  30. select {
  31. case msg, ok := <-w.JobChannel: // 无消息时阻塞
  32. if ok {
  33. callback(w.ID, msg)
  34. }
  35. case <-w.quit:
  36. return
  37. }
  38. }
  39. }(w, callback)
  40. }
  41. // 调度对象
  42. type ChanDispatcher struct {
  43. MsgQueue chan interface{} // 消息输入管道
  44. WorkerPool chan chan interface{} // 工作管道池
  45. maxWorkers int // 最大工作对象数
  46. workers []*ChanWorker // 工作对象列表
  47. }
  48. func NewChanDispatcher(msgQueue chan interface{}, maxWorkers int) *ChanDispatcher {
  49. return &ChanDispatcher{
  50. MsgQueue: msgQueue,
  51. WorkerPool: make(chan chan interface{}, maxWorkers),
  52. maxWorkers: maxWorkers,
  53. }
  54. }
  55. func (d *ChanDispatcher) Run(callback func(workerId int, msg interface{})) {
  56. for i := 0; i < d.maxWorkers; i++ {
  57. worker := NewChanWorker(i, d.WorkerPool)
  58. d.workers = append(d.workers, worker)
  59. worker.Start(callback)
  60. }
  61. d.closeWait()
  62. d.dispatch()
  63. }
  64. func (d *ChanDispatcher) dispatch() {
  65. go func(d *ChanDispatcher) {
  66. defer utils.DefaultGoroutineRecover(nil, `chan池调度`)
  67. for {
  68. select {
  69. case msg, ok := <-d.MsgQueue:
  70. if ok {
  71. // 从工作管道池中尝试取出一个空闲的工作管道(每次取用工作管道会从池中取出去,消息处理完再放回池子,所以池子中的都是空闲的)
  72. // 无空闲工作管道(池子中无消息)时阻塞
  73. jobChannel, isOpen := <-d.WorkerPool
  74. if isOpen {
  75. // 将一条消息发送给成功取出的工作管道
  76. jobChannel <- msg
  77. }
  78. }
  79. }
  80. }
  81. }(d)
  82. }
  83. func (d *ChanDispatcher) closeWait() {
  84. go func(d *ChanDispatcher) {
  85. defer utils.DefaultGoroutineRecover(nil, `chan池关闭`)
  86. var c chan os.Signal
  87. var s os.Signal
  88. c = make(chan os.Signal, 1)
  89. signal.Notify(c, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGKILL)
  90. for {
  91. s = <-c
  92. switch s {
  93. case syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGKILL:
  94. if nil != d.workers && len(d.workers) > 0 {
  95. for i := 0; i < len(d.workers); i++ {
  96. d.workers[i].quit <- true
  97. }
  98. }
  99. return
  100. default:
  101. return
  102. }
  103. }
  104. }(d)
  105. }