Ver código fonte

default goroutine recover

marion 5 anos atrás
pai
commit
96859f755b

+ 11 - 0
utils/goroutine_recover.go

@@ -0,0 +1,11 @@
+package utils
+
+import "git.haoqitour.com/haoqi/go-common/utils/logger"
+
+func DefaultGoroutineRecover(log *logger.Logger, action string) {
+	if err := recover(); err != nil {
+		if e, ok := err.(error); ok {
+			log.WithField("err", e.Error()).Error(action, " goroutine 异常")
+		}
+	}
+}

+ 3 - 0
utils/mq/kafka/consumer.go

@@ -2,6 +2,7 @@ package kafka
 
 import (
 	"fmt"
+	"git.haoqitour.com/haoqi/go-common/utils"
 	"git.haoqitour.com/haoqi/go-common/utils/logger"
 	"github.com/Shopify/sarama"
 	"github.com/bsm/sarama-cluster"
@@ -53,6 +54,7 @@ func (c *Consumer) GetLogger() *logger.Logger {
 func (c *Consumer) BytesMessages() <-chan []byte {
 	ch := make(chan []byte, 0)
 	go func(c *Consumer, ch chan []byte, oc <-chan *sarama.ConsumerMessage) {
+		defer utils.DefaultGoroutineRecover(c.log, `KAFKA消息读取管道`)
 		for msg := range oc {
 			ch <- msg.Value
 			c.MarkOffset(msg, "") // mark message as processed
@@ -64,6 +66,7 @@ func (c *Consumer) BytesMessages() <-chan []byte {
 // 将消息输出绑定到指定管道上,此方法内会进行反序列化,输出的消息类型可以是指定对象类型
 func (c *Consumer) BindJSONChan(channel interface{}) {
 	go func(c *Consumer, channel interface{}) {
+		defer utils.DefaultGoroutineRecover(c.log, `KAFKA消息输出绑定`)
 		chVal := reflect.ValueOf(channel)
 		if chVal.Kind() != reflect.Chan {
 			return

+ 8 - 2
utils/queue/buffer_postman.go

@@ -1,6 +1,8 @@
 package queue
 
 import (
+	"git.haoqitour.com/haoqi/go-common/utils"
+	"git.haoqitour.com/haoqi/go-common/utils/logger"
 	"sync"
 	"time"
 )
@@ -140,6 +142,7 @@ type BufferPostman struct {
 	timer       *time.Timer
 	isTimerStop bool
 	target      chan interface{}
+	log         *logger.Logger
 }
 
 // 新建缓冲投递员对象
@@ -149,10 +152,12 @@ func NewBufferPostman(limit int, duration time.Duration, target chan interface{}
 		duration: duration,
 		Buffer:   NewBufferMap(),
 		target:   target,
+		log:      logger.New(),
 	}
 	if duration > 0 {
 		p.timer = time.NewTimer(duration)
-		go func() {
+		go func(p *BufferPostman) {
+			defer utils.DefaultGoroutineRecover(p.log, `缓冲投递超时消息`)
 			for {
 				select {
 				case <-p.timer.C: // 超时
@@ -160,7 +165,7 @@ func NewBufferPostman(limit int, duration time.Duration, target chan interface{}
 					p.deliver()
 				}
 			}
-		}()
+		}(p)
 	}
 	return p
 }
@@ -189,6 +194,7 @@ func (p *BufferPostman) deliver() {
 
 		// 将消息推进中转chan
 		go func(p *BufferPostman, data []IBufferItem) {
+			defer utils.DefaultGoroutineRecover(p.log, `缓冲投递超限消息`)
 			for i := 0; i < len(data); i++ {
 				p.target <- data[i]
 			}

+ 28 - 22
utils/queue/chan_pool.go

@@ -1,6 +1,8 @@
 package queue
 
 import (
+	"git.haoqitour.com/haoqi/go-common/utils"
+	"git.haoqitour.com/haoqi/go-common/utils/logger"
 	"os"
 	"os/signal"
 	"syscall"
@@ -12,6 +14,7 @@ type ChanWorker struct {
 	WorkerPool chan chan interface{} // 工作管道池,实例化时由调度器传入
 	JobChannel chan interface{}      // 工作管道
 	quit       chan bool             // 退出消息
+	log        *logger.Logger
 }
 
 func NewChanWorker(workerId, capacity int, workerPool chan chan interface{}) *ChanWorker {
@@ -27,11 +30,13 @@ func NewChanWorker(workerId, capacity int, workerPool chan chan interface{}) *Ch
 		WorkerPool: workerPool,
 		JobChannel: jobChannel,
 		quit:       make(chan bool),
+		log:        logger.New(),
 	}
 }
 
 func (w *ChanWorker) Start(callback func(workerId int, msg interface{})) {
 	go func(w *ChanWorker, callback func(workerId int, msg interface{})) {
+		defer utils.DefaultGoroutineRecover(w.log, `chan池工作对象消息处理`)
 		for {
 			// 新工作管道加入工作管道池
 			w.WorkerPool <- w.JobChannel
@@ -45,25 +50,22 @@ func (w *ChanWorker) Start(callback func(workerId int, msg interface{})) {
 		}
 	}(w, callback)
 
-	go w.closeWait()
-}
-
-func (w *ChanWorker) Stop() {
-	go func(w *ChanWorker) {
-		w.quit <- true
-	}(w)
+	w.closeWait()
 }
 
 func (w *ChanWorker) closeWait() {
-	var c chan os.Signal
-	c = make(chan os.Signal, 1)
-	signal.Notify(c, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT)
-	for {
-		select {
-		case <-c:
-			w.Stop()
+	go func(w *ChanWorker) {
+		defer utils.DefaultGoroutineRecover(w.log, `chan池关闭`)
+		var c chan os.Signal
+		c = make(chan os.Signal, 1)
+		signal.Notify(c, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT)
+		for {
+			select {
+			case <-c:
+				w.quit <- true
+			}
 		}
-	}
+	}(w)
 }
 
 // 调度对象
@@ -72,6 +74,7 @@ type ChanDispatcher struct {
 	WorkerPool chan chan interface{} // 工作管道池
 	maxWorkers int                   // 最大工作对象数
 	capacity   int                   // 工作管道消息缓冲大小
+	log        *logger.Logger
 }
 
 func NewChanDispatcher(msgQueue chan interface{}, maxWorkers int) *ChanDispatcher {
@@ -80,6 +83,7 @@ func NewChanDispatcher(msgQueue chan interface{}, maxWorkers int) *ChanDispatche
 		WorkerPool: make(chan chan interface{}, maxWorkers),
 		maxWorkers: maxWorkers,
 		capacity:   -1,
+		log:        logger.New(),
 	}
 }
 
@@ -89,6 +93,7 @@ func NewChanDispatcherWithCapacity(msgQueue chan interface{}, maxWorkers, capaci
 		WorkerPool: make(chan chan interface{}, maxWorkers),
 		maxWorkers: maxWorkers,
 		capacity:   capacity,
+		log:        logger.New(),
 	}
 }
 
@@ -98,20 +103,21 @@ func (d *ChanDispatcher) Run(callback func(workerId int, msg interface{})) {
 		worker.Start(callback)
 	}
 
-	go d.dispatch()
+	d.dispatch()
 }
 
 func (d *ChanDispatcher) dispatch() {
-	for {
-		select {
-		case msg := <-d.MsgQueue:
-			go func(d *ChanDispatcher, msg interface{}) {
+	go func(d *ChanDispatcher) {
+		defer utils.DefaultGoroutineRecover(d.log, `chan池调度`)
+		for {
+			select {
+			case msg := <-d.MsgQueue:
 				// 从工作管道池中尝试取出一个空闲(未阻塞)的工作管道,无空闲工作管道时阻塞
 				jobChannel := <-d.WorkerPool
 
 				// 将一条消息发送给当前工作管道
 				jobChannel <- msg
-			}(d, msg)
+			}
 		}
-	}
+	}(d)
 }