chan_pool.go 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
  1. package queue
  2. import (
  3. "git.haoqitour.com/haoqi/go-common/utils"
  4. "git.haoqitour.com/haoqi/go-common/utils/logger"
  5. "os"
  6. "os/signal"
  7. "syscall"
  8. )
  9. // 工作对象封装
  10. type ChanWorker struct {
  11. ID int // 工作对象编号
  12. WorkerPool chan chan interface{} // 工作管道池,实例化时由调度器传入
  13. JobChannel chan interface{} // 工作管道
  14. quit chan bool // 退出消息
  15. log *logger.Logger
  16. }
  17. func NewChanWorker(workerId, capacity int, workerPool chan chan interface{}) *ChanWorker {
  18. var jobChannel chan interface{}
  19. if capacity < 0 {
  20. jobChannel = make(chan interface{})
  21. } else {
  22. jobChannel = make(chan interface{}, capacity)
  23. }
  24. return &ChanWorker{
  25. ID: workerId,
  26. WorkerPool: workerPool,
  27. JobChannel: jobChannel,
  28. quit: make(chan bool),
  29. log: logger.New(),
  30. }
  31. }
  32. func (w *ChanWorker) Start(callback func(workerId int, msg interface{})) {
  33. go func(w *ChanWorker, callback func(workerId int, msg interface{})) {
  34. defer utils.DefaultGoroutineRecover(w.log, `chan池工作对象消息处理`)
  35. for {
  36. // 新工作管道加入工作管道池
  37. w.WorkerPool <- w.JobChannel
  38. select {
  39. case msg := <-w.JobChannel:
  40. callback(w.ID, msg)
  41. case <-w.quit:
  42. return
  43. }
  44. }
  45. }(w, callback)
  46. w.closeWait()
  47. }
  48. func (w *ChanWorker) Stop() {
  49. go func(w *ChanWorker) {
  50. defer utils.DefaultGoroutineRecover(w.log, `chan池工作对象关闭`)
  51. w.quit <- true
  52. }(w)
  53. }
  54. func (w *ChanWorker) closeWait() {
  55. go func(w *ChanWorker) {
  56. defer utils.DefaultGoroutineRecover(w.log, `chan池关闭`)
  57. var c chan os.Signal
  58. c = make(chan os.Signal, 1)
  59. signal.Notify(c, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT)
  60. for {
  61. select {
  62. case <-c:
  63. w.Stop()
  64. }
  65. }
  66. }(w)
  67. }
  68. // 调度对象
  69. type ChanDispatcher struct {
  70. MsgQueue chan interface{} // 消息输入管道
  71. WorkerPool chan chan interface{} // 工作管道池
  72. maxWorkers int // 最大工作对象数
  73. capacity int // 工作管道消息缓冲大小
  74. log *logger.Logger
  75. }
  76. func NewChanDispatcher(msgQueue chan interface{}, maxWorkers int) *ChanDispatcher {
  77. return &ChanDispatcher{
  78. MsgQueue: msgQueue,
  79. WorkerPool: make(chan chan interface{}, maxWorkers),
  80. maxWorkers: maxWorkers,
  81. capacity: -1,
  82. log: logger.New(),
  83. }
  84. }
  85. func NewChanDispatcherWithCapacity(msgQueue chan interface{}, maxWorkers, capacity int) *ChanDispatcher {
  86. return &ChanDispatcher{
  87. MsgQueue: msgQueue,
  88. WorkerPool: make(chan chan interface{}, maxWorkers),
  89. maxWorkers: maxWorkers,
  90. capacity: capacity,
  91. log: logger.New(),
  92. }
  93. }
  94. func (d *ChanDispatcher) Run(callback func(workerId int, msg interface{})) {
  95. for i := 0; i < d.maxWorkers; i++ {
  96. worker := NewChanWorker(i, d.capacity, d.WorkerPool)
  97. worker.Start(callback)
  98. }
  99. d.dispatch()
  100. }
  101. func (d *ChanDispatcher) dispatch() {
  102. go func(d *ChanDispatcher) {
  103. defer utils.DefaultGoroutineRecover(d.log, `chan池调度`)
  104. for {
  105. select {
  106. case msg := <-d.MsgQueue:
  107. go func(d *ChanDispatcher, msg interface{}) {
  108. // 从工作管道池中尝试取出一个空闲(未阻塞)的工作管道,无空闲工作管道时阻塞
  109. jobChannel := <-d.WorkerPool
  110. // 将一条消息发送给当前工作管道
  111. jobChannel <- msg
  112. }(d, msg)
  113. }
  114. }
  115. }(d)
  116. }