chan_pool.go 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127
  1. package queue
  2. import (
  3. "git.haoqitour.com/haoqi/go-common/utils"
  4. "os"
  5. "os/signal"
  6. "syscall"
  7. )
  8. // 工作对象封装
  9. type ChanWorker struct {
  10. ID int // 工作对象编号
  11. WorkerPool chan chan interface{} // 工作管道池,实例化时由调度器传入
  12. JobChannel chan interface{} // 工作管道
  13. quit chan bool // 退出消息
  14. }
  15. func NewChanWorker(workerId, capacity int, workerPool chan chan interface{}) *ChanWorker {
  16. var jobChannel chan interface{}
  17. if capacity < 0 {
  18. jobChannel = make(chan interface{})
  19. } else {
  20. jobChannel = make(chan interface{}, capacity)
  21. }
  22. return &ChanWorker{
  23. ID: workerId,
  24. WorkerPool: workerPool,
  25. JobChannel: jobChannel,
  26. quit: make(chan bool),
  27. }
  28. }
  29. func (w *ChanWorker) Start(callback func(workerId int, msg interface{})) {
  30. go func(w *ChanWorker, callback func(workerId int, msg interface{})) {
  31. defer utils.DefaultGoroutineRecover(nil, `chan池工作对象消息处理`)
  32. for {
  33. // 新工作管道或每次取用工作管道后,加入工作管道池
  34. w.WorkerPool <- w.JobChannel
  35. select {
  36. case msg, ok := <-w.JobChannel:
  37. if ok {
  38. callback(w.ID, msg)
  39. }
  40. case <-w.quit:
  41. return
  42. }
  43. }
  44. }(w, callback)
  45. w.closeWait()
  46. }
  47. func (w *ChanWorker) closeWait() {
  48. go func(w *ChanWorker) {
  49. defer utils.DefaultGoroutineRecover(nil, `chan池关闭`)
  50. var c chan os.Signal
  51. var s os.Signal
  52. c = make(chan os.Signal, 1)
  53. signal.Notify(c, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
  54. for {
  55. s = <-c
  56. switch s {
  57. case syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT:
  58. w.quit <- true
  59. return
  60. default:
  61. return
  62. }
  63. }
  64. }(w)
  65. }
  66. // 调度对象
  67. type ChanDispatcher struct {
  68. MsgQueue chan interface{} // 消息输入管道
  69. WorkerPool chan chan interface{} // 工作管道池
  70. maxWorkers int // 最大工作对象数
  71. capacity int // 工作管道消息缓冲大小
  72. }
  73. func NewChanDispatcher(msgQueue chan interface{}, maxWorkers int) *ChanDispatcher {
  74. return &ChanDispatcher{
  75. MsgQueue: msgQueue,
  76. WorkerPool: make(chan chan interface{}, maxWorkers),
  77. maxWorkers: maxWorkers,
  78. capacity: -1,
  79. }
  80. }
  81. func NewChanDispatcherWithCapacity(msgQueue chan interface{}, maxWorkers, capacity int) *ChanDispatcher {
  82. return &ChanDispatcher{
  83. MsgQueue: msgQueue,
  84. WorkerPool: make(chan chan interface{}, maxWorkers),
  85. maxWorkers: maxWorkers,
  86. capacity: capacity,
  87. }
  88. }
  89. func (d *ChanDispatcher) Run(callback func(workerId int, msg interface{})) {
  90. for i := 0; i < d.maxWorkers; i++ {
  91. worker := NewChanWorker(i, d.capacity, d.WorkerPool)
  92. worker.Start(callback)
  93. }
  94. d.dispatch()
  95. }
  96. func (d *ChanDispatcher) dispatch() {
  97. go func(d *ChanDispatcher) {
  98. defer utils.DefaultGoroutineRecover(nil, `chan池调度`)
  99. for {
  100. select {
  101. case msg, ok := <-d.MsgQueue:
  102. if ok {
  103. // 从工作管道池中尝试取出一个空闲的工作管道(每次取用工作管道会从池中取出去,消息处理完再放回池子,所以池子中的都是空闲的),无空闲工作管道(池子中无消息)时阻塞
  104. jobChannel, isOpen := <-d.WorkerPool
  105. if isOpen {
  106. // 将一条消息发送给成功取出的工作管道
  107. jobChannel <- msg
  108. }
  109. }
  110. }
  111. }
  112. }(d)
  113. }