proxy_dao.go 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. package dao
  2. import (
  3. "fmt"
  4. "git.aionnect.com/aionnect/go-common/utils"
  5. "git.aionnect.com/aionnect/go-common/utils/date"
  6. "git.aionnect.com/hello-go/spider/common"
  7. "math/rand"
  8. "strings"
  9. "sync"
  10. "time"
  11. "xorm.io/xorm"
  12. )
  13. var proxyOnce sync.Once
  14. var proxyDao *ProxyDao
  15. // 代理信息数据访问对象
  16. type ProxyDao struct {
  17. db *xorm.Engine // 数据库访问对象
  18. cache *common.ConcurrentMap // 本地缓存
  19. }
  20. // 返回代理信息数据库访问对象
  21. func NewProxyDao() *ProxyDao {
  22. proxyOnce.Do(func() {
  23. proxyDao = &ProxyDao{
  24. db: DB("spider"),
  25. cache: common.NewConcurrentMap(),
  26. }
  27. })
  28. return proxyDao
  29. }
  30. // 保存代理信息到数据库
  31. func (d *ProxyDao) Save(proxies []*common.ProxyInfo) {
  32. if nil == proxies || len(proxies) == 0 {
  33. return
  34. }
  35. for i := 0; i < len(proxies); i++ {
  36. if proxies[i].ID == 0 {
  37. proxies[i].ID = utils.NextId()
  38. }
  39. proxies[i].CreatedAt = date.Now()
  40. }
  41. err := utils.Insert(d.db, &proxies)
  42. if nil != err {
  43. fmt.Printf("save proxies to db failed %s", err.Error())
  44. }
  45. }
  46. const ReadClause = `select distinct addr from (
  47. select concat(ip, ':', port) as 'addr'
  48. from proxy_info
  49. where (type = 'HTTP' or type = 'HTTPS')
  50. and anonymity = '高匿名'
  51. order by update_time desc, speed
  52. limit 100) T`
  53. // 查询代理信息
  54. func (d *ProxyDao) Get() string {
  55. // 当缓存中数据量过少时,从数据库中读取最新的100条
  56. if d.cache.Len() < 10 {
  57. var addrs []string
  58. err := d.db.SQL(ReadClause).Find(&addrs)
  59. if nil != err {
  60. fmt.Printf("read proxies to db failed %s", err.Error())
  61. } else if nil != addrs && len(addrs) > 0 {
  62. m := make(map[string]interface{})
  63. for i := 0; i < len(addrs); i++ {
  64. m[addrs[i]] = true
  65. }
  66. d.cache.Append(m)
  67. }
  68. }
  69. // 从缓存中返回随机一个
  70. keys := d.cache.Keys()
  71. rand.Seed(time.Now().UnixNano())
  72. idx := rand.Intn(len(keys))
  73. return keys[idx]
  74. }
  75. // 删除代理信息
  76. func (d *ProxyDao) Remove(key string) {
  77. key = strings.TrimSpace(key)
  78. if key == "" {
  79. return
  80. }
  81. // 缓存中移除
  82. d.cache.Remove(key)
  83. // 数据库中移除
  84. idx := strings.LastIndex(key, ":")
  85. ip := key[:idx]
  86. port := strings.TrimLeft(key[idx:], ":")
  87. _, err := d.db.Where("ip=? and port=?", ip, port).Delete(&common.ProxyInfo{})
  88. if nil != err {
  89. fmt.Printf("remove proxy to db failed %s", err.Error())
  90. }
  91. }