chan_pool.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117
  1. package queue
  2. import (
  3. "os"
  4. "os/signal"
  5. "syscall"
  6. )
  7. // 工作对象封装
  8. type ChanWorker struct {
  9. ID int // 工作对象编号
  10. WorkerPool chan chan interface{} // 工作管道池,实例化时由调度器传入
  11. JobChannel chan interface{} // 工作管道
  12. quit chan bool // 退出消息
  13. }
  14. func NewChanWorker(workerId, capacity int, workerPool chan chan interface{}) *ChanWorker {
  15. var jobChannel chan interface{}
  16. if capacity < 0 {
  17. jobChannel = make(chan interface{})
  18. } else {
  19. jobChannel = make(chan interface{}, capacity)
  20. }
  21. return &ChanWorker{
  22. ID: workerId,
  23. WorkerPool: workerPool,
  24. JobChannel: jobChannel,
  25. quit: make(chan bool),
  26. }
  27. }
  28. func (w *ChanWorker) Start(callback func(workerId int, msg interface{})) {
  29. go func(w *ChanWorker, callback func(workerId int, msg interface{})) {
  30. for {
  31. // 新工作管道加入工作管道池
  32. w.WorkerPool <- w.JobChannel
  33. select {
  34. case msg := <-w.JobChannel:
  35. callback(w.ID, msg)
  36. case <-w.quit:
  37. return
  38. }
  39. }
  40. }(w, callback)
  41. go w.closeWait()
  42. }
  43. func (w *ChanWorker) Stop() {
  44. go func(w *ChanWorker) {
  45. w.quit <- true
  46. }(w)
  47. }
  48. func (w *ChanWorker) closeWait() {
  49. var c chan os.Signal
  50. c = make(chan os.Signal, 1)
  51. signal.Notify(c, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT)
  52. for {
  53. select {
  54. case <-c:
  55. w.Stop()
  56. }
  57. }
  58. }
  59. // 调度对象
  60. type ChanDispatcher struct {
  61. MsgQueue chan interface{} // 消息输入管道
  62. WorkerPool chan chan interface{} // 工作管道池
  63. maxWorkers int // 最大工作对象数
  64. capacity int // 工作管道消息缓冲大小
  65. }
  66. func NewChanDispatcher(msgQueue chan interface{}, maxWorkers int) *ChanDispatcher {
  67. return &ChanDispatcher{
  68. MsgQueue: msgQueue,
  69. WorkerPool: make(chan chan interface{}, maxWorkers),
  70. maxWorkers: maxWorkers,
  71. capacity: -1,
  72. }
  73. }
  74. func NewChanDispatcherWithCapacity(msgQueue chan interface{}, maxWorkers, capacity int) *ChanDispatcher {
  75. return &ChanDispatcher{
  76. MsgQueue: msgQueue,
  77. WorkerPool: make(chan chan interface{}, maxWorkers),
  78. maxWorkers: maxWorkers,
  79. capacity: capacity,
  80. }
  81. }
  82. func (d *ChanDispatcher) Run(callback func(workerId int, msg interface{})) {
  83. for i := 0; i < d.maxWorkers; i++ {
  84. worker := NewChanWorker(i, d.capacity, d.WorkerPool)
  85. worker.Start(callback)
  86. }
  87. go d.dispatch()
  88. }
  89. func (d *ChanDispatcher) dispatch() {
  90. for {
  91. select {
  92. case msg := <-d.MsgQueue:
  93. go func(d *ChanDispatcher, msg interface{}) {
  94. // 从工作管道池中尝试取出一个空闲(未阻塞)的工作管道,无空闲工作管道时阻塞
  95. jobChannel := <-d.WorkerPool
  96. // 将一条消息发送给当前工作管道
  97. jobChannel <- msg
  98. }(d, msg)
  99. }
  100. }
  101. }