Jelajahi Sumber

default goroutine recover

marion 5 tahun lalu
induk
melakukan
3c0321e5d9
3 mengubah file dengan 15 tambahan dan 17 penghapusan
  1. 10 3
      utils/goroutine_recover.go
  2. 2 5
      utils/queue/buffer_postman.go
  3. 3 9
      utils/queue/chan_pool.go

+ 10 - 3
utils/goroutine_recover.go

@@ -1,11 +1,18 @@
 package utils
 
-import "git.haoqitour.com/haoqi/go-common/utils/logger"
+import (
+	"git.haoqitour.com/haoqi/go-common/utils/logger"
+	"github.com/prometheus/common/log"
+)
 
-func DefaultGoroutineRecover(log *logger.Logger, action string) {
+func DefaultGoroutineRecover(l *logger.Logger, action string) {
 	if err := recover(); err != nil {
 		if e, ok := err.(error); ok {
-			log.WithField("err", e.Error()).Error(action, " goroutine 异常")
+			if nil != l {
+				l.WithField("err", e.Error()).Error(action, " goroutine 异常")
+			} else {
+				log.Warn(action, " goroutine 异常 ", e.Error())
+			}
 		}
 	}
 }

+ 2 - 5
utils/queue/buffer_postman.go

@@ -2,7 +2,6 @@ package queue
 
 import (
 	"git.haoqitour.com/haoqi/go-common/utils"
-	"git.haoqitour.com/haoqi/go-common/utils/logger"
 	"sync"
 	"time"
 )
@@ -142,7 +141,6 @@ type BufferPostman struct {
 	timer       *time.Timer
 	isTimerStop bool
 	target      chan interface{}
-	log         *logger.Logger
 }
 
 // 新建缓冲投递员对象
@@ -152,12 +150,11 @@ func NewBufferPostman(limit int, duration time.Duration, target chan interface{}
 		duration: duration,
 		Buffer:   NewBufferMap(),
 		target:   target,
-		log:      logger.New(),
 	}
 	if duration > 0 {
 		p.timer = time.NewTimer(duration)
 		go func(p *BufferPostman) {
-			defer utils.DefaultGoroutineRecover(p.log, `缓冲投递超时消息`)
+			defer utils.DefaultGoroutineRecover(nil, `缓冲投递超时消息`)
 			for {
 				select {
 				case <-p.timer.C: // 超时
@@ -194,7 +191,7 @@ func (p *BufferPostman) deliver() {
 
 		// 将消息推进中转chan
 		go func(p *BufferPostman, data []IBufferItem) {
-			defer utils.DefaultGoroutineRecover(p.log, `缓冲投递超限消息`)
+			defer utils.DefaultGoroutineRecover(nil, `缓冲投递超限消息`)
 			for i := 0; i < len(data); i++ {
 				p.target <- data[i]
 			}

+ 3 - 9
utils/queue/chan_pool.go

@@ -2,7 +2,6 @@ package queue
 
 import (
 	"git.haoqitour.com/haoqi/go-common/utils"
-	"git.haoqitour.com/haoqi/go-common/utils/logger"
 	"os"
 	"os/signal"
 	"syscall"
@@ -14,7 +13,6 @@ type ChanWorker struct {
 	WorkerPool chan chan interface{} // 工作管道池,实例化时由调度器传入
 	JobChannel chan interface{}      // 工作管道
 	quit       chan bool             // 退出消息
-	log        *logger.Logger
 }
 
 func NewChanWorker(workerId, capacity int, workerPool chan chan interface{}) *ChanWorker {
@@ -30,13 +28,12 @@ func NewChanWorker(workerId, capacity int, workerPool chan chan interface{}) *Ch
 		WorkerPool: workerPool,
 		JobChannel: jobChannel,
 		quit:       make(chan bool),
-		log:        logger.New(),
 	}
 }
 
 func (w *ChanWorker) Start(callback func(workerId int, msg interface{})) {
 	go func(w *ChanWorker, callback func(workerId int, msg interface{})) {
-		defer utils.DefaultGoroutineRecover(w.log, `chan池工作对象消息处理`)
+		defer utils.DefaultGoroutineRecover(nil, `chan池工作对象消息处理`)
 		for {
 			// 新工作管道加入工作管道池
 			w.WorkerPool <- w.JobChannel
@@ -55,7 +52,7 @@ func (w *ChanWorker) Start(callback func(workerId int, msg interface{})) {
 
 func (w *ChanWorker) closeWait() {
 	go func(w *ChanWorker) {
-		defer utils.DefaultGoroutineRecover(w.log, `chan池关闭`)
+		defer utils.DefaultGoroutineRecover(nil, `chan池关闭`)
 		var c chan os.Signal
 		c = make(chan os.Signal, 1)
 		signal.Notify(c, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT)
@@ -74,7 +71,6 @@ type ChanDispatcher struct {
 	WorkerPool chan chan interface{} // 工作管道池
 	maxWorkers int                   // 最大工作对象数
 	capacity   int                   // 工作管道消息缓冲大小
-	log        *logger.Logger
 }
 
 func NewChanDispatcher(msgQueue chan interface{}, maxWorkers int) *ChanDispatcher {
@@ -83,7 +79,6 @@ func NewChanDispatcher(msgQueue chan interface{}, maxWorkers int) *ChanDispatche
 		WorkerPool: make(chan chan interface{}, maxWorkers),
 		maxWorkers: maxWorkers,
 		capacity:   -1,
-		log:        logger.New(),
 	}
 }
 
@@ -93,7 +88,6 @@ func NewChanDispatcherWithCapacity(msgQueue chan interface{}, maxWorkers, capaci
 		WorkerPool: make(chan chan interface{}, maxWorkers),
 		maxWorkers: maxWorkers,
 		capacity:   capacity,
-		log:        logger.New(),
 	}
 }
 
@@ -108,7 +102,7 @@ func (d *ChanDispatcher) Run(callback func(workerId int, msg interface{})) {
 
 func (d *ChanDispatcher) dispatch() {
 	go func(d *ChanDispatcher) {
-		defer utils.DefaultGoroutineRecover(d.log, `chan池调度`)
+		defer utils.DefaultGoroutineRecover(nil, `chan池调度`)
 		for {
 			select {
 			case msg := <-d.MsgQueue: