package spiders import ( "fmt" "git.aionnect.com/aionnect/go-common/utils" "git.aionnect.com/hello-go/spider/common" "git.aionnect.com/hello-go/spider/dao" "net/http" "strings" "sync" ) // 爬虫调度上下文 type spidersContext struct { counter sync.WaitGroup // 同步器 spiderMap map[string]*Spider // 多级爬虫价值对 } // 开始爬虫调度 func Run(startingTarget *common.Target, spiders ...ISpider) { if nil == startingTarget || nil == spiders || len(spiders) == 0 { return } // 爬虫调度上下文初始化 ctx := &spidersContext{ spiderMap: make(map[string]*Spider), } for i := 0; i < len(spiders); i++ { spider := newSpider(spiders[i], ctx) // 包装上爬虫执行类 name := strings.TrimSpace(spider.Name()) if _, ok := ctx.spiderMap[name]; !ok { ctx.spiderMap[name] = spider } } // 从起始页开始处理 ctx.router(startingTarget) // 等待爬虫执行完毕 ctx.counter.Wait() // 爬虫执行完毕后处理 for _, spider := range ctx.spiderMap { if after, ok := spider.ISpider.(IAfterSpiderExit); ok { after.AfterExit() } close(spider.queue) } } // 爬虫路由处理 func (ctx *spidersContext) router(target *common.Target) { if nil == target || nil == ctx.spiderMap || len(ctx.spiderMap) == 0 { fmt.Println("spidersContext.router() invalid params") return } target.Key = strings.TrimSpace(target.Key) if target.Key == "" { fmt.Println("spidersContext.router() target key can not be empty") return } if spider, ok := ctx.spiderMap[target.Key]; ok { ctx.counter.Add(1) spider.Send(target) } } // 爬虫主接口抽象 type ISpider interface { Name() string // 爬虫命名 Parse(item interface{}, body []byte, response *http.Response) ([]*common.Target, error) // 响应解析 } // 自定义爬虫请求接口抽象 type IPromiseDefiner interface { GetPromise() *utils.RequestPromise } // 爬虫执行结束后需进行的处理接口抽象 type IAfterSpiderExit interface { AfterExit() } // 爬虫执行类(模拟抽象类及模版方法模式) type Spider struct { ISpider queue chan *common.Target // 待爬队列 promise *utils.RequestPromise // 请求对象 context *spidersContext // 爬虫调度上下文的引用 proxyDao *dao.ProxyDao // 代理信息数据访问对象 } // 返回新的爬虫调度类对象实例(模版方法) func newSpider(instance ISpider, ctx *spidersContext) *Spider { spider := &Spider{ ISpider: instance, queue: make(chan *common.Target, 5), context: ctx, proxyDao: dao.NewProxyDao(), } if definer, ok := instance.(IPromiseDefiner); ok { spider.promise = definer.GetPromise() } else { spider.promise = utils.NewRequest() } go spider.handle() // 启动待爬队列异步处理协程 return spider } // 新待爬请求入队 func (s *Spider) Send(target *common.Target) { if nil == target { return } // 入队不启动新协程,起到bufferSize满时阻塞等待的效果 s.queue <- target } // 待爬队列处理 func (s *Spider) handle() { for { select { case target, ok := <-s.queue: if ok { s.invoke(target) s.context.counter.Done() // 同步器扣减 } } } } // 请求并处理(模版方法) func (s *Spider) invoke(target *common.Target) { target.Referer = strings.TrimSpace(target.Referer) if target.Referer != "" { s.promise = s.promise.SetHeader("Referer", target.Referer) } s.promise.SetHeader("User-Agent", common.RandUserAgent(target.IsCell)) // 随机User-Agent s.promise.SetHttpProxy(s.proxyDao.Get()) // 随机代理 body, resp, err := s.promise.CallResponse(target.Method, target.URL, target.Data) if nil != err { fmt.Printf("http request [%s] %s failed. %s\n", target.Method, target.URL, err.Error()) return } if nil == body || len(body) <= 0 { fmt.Printf("empty http response [%s] %s", target.Method, target.URL) return } var nextTargets []*common.Target nextTargets, err = s.Parse(target.Item, body, resp) if nil != err { fmt.Println(err.Error()) return } if nil == nextTargets || len(nextTargets) == 0 { return } for i := 0; i < len(nextTargets); i++ { next := nextTargets[i] s.context.router(next) } }