设为首页 - 加入收藏 哈尔滨站长网 (http://www.0451zz.com)- 国内知名站长资讯网站,提供最新最全的站长资讯,创业经验,网站建设等!
热搜: vivo 2018 face javascript
当前位置: 首页 > 运营中心 > 建站资源 > 优化 > 正文

Java 多线程爬虫及分布式爬虫架构探索

发布时间:2019-10-21 01:14 所属栏目:[优化] 来源:平头哥
导读:这是 Java 爬虫系列博文的第五篇,在上一篇 Java 爬虫服务器被屏蔽,不要慌,咱们换一台服务器 中,我们简单的聊反爬虫策略和反反爬虫方法,主要针对的是 IP 被封及其对应办法。前面几篇文章我们把爬虫相关的基本知识都讲的差不多啦。这一篇我们来聊一聊爬

这是 Java 爬虫系列博文的第五篇,在上一篇 Java 爬虫服务器被屏蔽,不要慌,咱们换一台服务器 中,我们简单的聊反爬虫策略和反反爬虫方法,主要针对的是 IP 被封及其对应办法。前面几篇文章我们把爬虫相关的基本知识都讲的差不多啦。这一篇我们来聊一聊爬虫架构相关的内容。

Java 多线程爬虫及分布式爬虫架构探索

前面几章内容我们的爬虫程序都是单线程,在我们调试爬虫程序的时候,单线程爬虫没什么问题,但是当我们在线上环境使用单线程爬虫程序去采集网页时,单线程就暴露出了两个致命的问题:

  • 采集效率特别慢,单线程之间都是串行的,下一个执行动作需要等上一个执行完才能执行
  • 对服务器的CUP等利用率不高,想想我们的服务器都是 8核16G,32G 的只跑一个线程会不会太浪费啦

线上环境不可能像我们本地测试一样,不在乎采集效率,只要能正确提取结果就行。在这个时间就是金钱的年代,不可能给你时间去慢慢的采集,所以单线程爬虫程序是行不通的,我们需要将单线程改成多线程的模式,来提升采集效率和提高计算机利用率。

多线程的爬虫程序设计比单线程就要复杂很多,但是与其他业务在高并发下要保证数据安全又不同,多线程爬虫在数据安全上到要求不是那么的高,因为每个页面都可以被看作是一个独立体。要做好多线程爬虫就必须做好两点:第一点就是统一的待采集 URL 维护,第二点就是 URL 的去重, 下面我们简单的来聊一聊这两点。

维护待采集的 URL

多线程爬虫程序就不能像单线程那样,每个线程独自维护这自己的待采集 URL,如果这样的话,那么每个线程采集的网页将是一样的,你这就不是多线程采集啦,你这是将一个页面采集的多次。基于这个原因我们就需要将待采集的 URL 统一维护,每个线程从统一 URL 维护处领取采集 URL ,完成采集任务,如果在页面上发现新的 URL 链接则添加到 统一 URL 维护的容器中。下面是几种适合用作统一 URL 维护的容器:

  • JDK 的安全队列,例如 LinkedBlockingQueue
  • 高性能的 NoSQL,比如 Redis、Mongodb
  • MQ 消息中间件

URL 的去重

URL 的去重也是多线程采集的关键一步,因为如果不去重的话,那么我们将采集到大量重复的 URL,这样并没有提升我们的采集效率,比如一个分页的新闻列表,我们在采集第一页的时候可以得到 2、3、4、5 页的链接,在采集第二页的时候又会得到 1、3、4、5 页的链接,待采集的 URL 队列中将存在大量的列表页链接,这样就会重复采集甚至进入到一个死循环当中,所以就需要 URL 去重。URL 去重的方法就非常多啦,下面是几种常用的 URL 去重方式:

  • 将 URL 保存到数据库进行去重,比如 redis、MongoDB
  • 将 URL 放到哈希表中去重,例如 hashset
  • 将 URL 经过 MD5 之后保存到哈希表中去重,相比于上面一种,能够节约空间
  • 使用 布隆过滤器(Bloom Filter)去重,这种方式能够节约大量的空间,就是不那么准确。

关于多线程爬虫的两个核心知识点我们都知道啦,下面我画了一个简单的多线程爬虫架构图,如下图所示:?

Java 多线程爬虫及分布式爬虫架构探索

多线程爬虫架构图

上面我们主要了解了多线程爬虫的架构设计,接下来我们不妨来试试 Java 多线程爬虫,我们以采集虎扑新闻为例来实战一下 Java 多线程爬虫,Java 多线程爬虫中设计到了 待采集 URL 的维护和 URL 去重,由于我们这里只是演示,所以我们就使用 JDK 内置的容器来完成,我们使用 LinkedBlockingQueue 作为待采集 URL 维护容器,HashSet 作为 URL 去重容器。下面是 Java 多线程爬虫核心代码,详细代码以上传 GitHub,地址在文末:

  1. /**?
  2. ?*?多线程爬虫?
  3. ?*/?
  4. public?class?ThreadCrawler?implements?Runnable?{?
  5. ????//?采集的文章数?
  6. ????private?final?AtomicLong?pageCount?=?new?AtomicLong(0);?
  7. ????//?列表页链接正则表达式?
  8. ????public?static?final?String?URL_LIST?=?"https://voice.hupu.com/nba";?
  9. ????protected?Logger?logger?=?LoggerFactory.getLogger(getClass());?
  10. ????//?待采集的队列?
  11. ????LinkedBlockingQueue?taskQueue;?
  12. ????//?采集过的链接列表?
  13. ????HashSet?visited;?
  14. ????//?线程池?
  15. ????CountableThreadPool?threadPool;?
  16. ????/**?
  17. ?????*?
  18. ?????*?@param?url?起始页?
  19. ?????*?@param?threadNum?线程数?
  20. ?????*?@throws?InterruptedException?
  21. ?????*/?
  22. ????public?ThreadCrawler(String?url,?int?threadNum)?throws?InterruptedException?{?
  23. ????????this.taskQueue?=?new?LinkedBlockingQueue<>();?
  24. ????????this.threadPool?=?new?CountableThreadPool(threadNum);?
  25. ????????this.visited?=?new?HashSet<>();?
  26. ????????//?将起始页添加到待采集队列中?
  27. ????????this.taskQueue.put(url);?
  28. ????}?
  29. ?
  30. ????@Override?
  31. ????public?void?run()?{?
  32. ????????logger.info("Spider?started!");?
  33. ????????while?(!Thread.currentThread().isInterrupted())?{?
  34. ????????????//?从队列中获取待采集?URL?
  35. ????????????final?String?request?=?taskQueue.poll();?
  36. ????????????//?如果获取?request?为空,并且当前的线程采已经没有线程在运行?
  37. ????????????if?(request?==?null)?{?
  38. ????????????????if?(threadPool.getThreadAlive()?==?0)?{?
  39. ????????????????????break;?
  40. ????????????????}?
  41. ????????????}?else?{?
  42. ????????????????//?执行采集任务?
  43. ????????????????threadPool.execute(new?Runnable()?{?
  44. ????????????????????@Override?
  45. ????????????????????public?void?run()?{?
  46. ????????????????????????try?{?
  47. ????????????????????????????processRequest(request);?
  48. ????????????????????????}?catch?(Exception?e)?{?
  49. ????????????????????????????logger.error("process?request?"?+?request?+?"?error",?e);?
  50. ????????????????????????}?finally?{?
  51. ????????????????????????????//?采集页面?+1?
  52. ????????????????????????????pageCount.incrementAndGet();?
  53. ????????????????????????}?
  54. ????????????????????}?
  55. ????????????????});?
  56. ????????????}?
  57. ????????}?
  58. ????????threadPool.shutdown();?
  59. ????????logger.info("Spider?closed!?{}?pages?downloaded.",?pageCount.get());?
  60. ????}?
  61. ?
  62. ????/**?
  63. ?????*?处理采集请求?
  64. ?????*?@param?url?
  65. ?????*/?
  66. ????protected?void?processRequest(String?url)?{?
  67. ????????//?判断是否为列表页?
  68. ????????if?(url.matches(URL_LIST))?{?
  69. ????????????//?列表页解析出详情页链接添加到待采集URL队列中?
  70. ????????????processTaskQueue(url);?
  71. ????????}?else?{?
  72. ????????????//?解析网页?
  73. ????????????processPage(url);?
  74. ????????}?
  75. ????}?
  76. ????/**?
  77. ?????*?处理链接采集?
  78. ?????*?处理列表页,将?url?添加到队列中?
  79. ?????*?
  80. ?????*?@param?url?
  81. ?????*/?
  82. ????protected?void?processTaskQueue(String?url)?{?
  83. ????????try?{?
  84. ????????????Document?doc?=?Jsoup.connect(url).get();?
  85. ????????????//?详情页链接?
  86. ????????????Elements?elements?=?doc.select("?div.news-list?>?ul?>?li?>?div.list-hd?>?h4?>?a");?
  87. ????????????elements.stream().forEach((element?->?{?
  88. ????????????????String?request?=?element.attr("href");?
  89. ????????????????//?判断该链接是否存在队列或者已采集的?set?中,不存在则添加到队列中?
  90. ????????????????if?(!visited.contains(request)?&&?!taskQueue.contains(request))?{?
  91. ????????????????????try?{?
  92. ????????????????????????taskQueue.put(request);?
  93. ????????????????????}?catch?(InterruptedException?e)?{?
  94. ????????????????????????e.printStackTrace();?
  95. ????????????????????}?
  96. ????????????????}?
  97. ????????????}));?
  98. ????????????//?列表页链接?
  99. ????????????Elements?list_urls?=?doc.select("div.voice-paging?>?a");?
  100. ????????????list_urls.stream().forEach((element?->?{?
  101. ????????????????String?request?=?element.absUrl("href");?
  102. ????????????????//?判断是否符合要提取的列表链接要求?
  103. ????????????????if?(request.matches(URL_LIST))?{?
  104. ????????????????????//?判断该链接是否存在队列或者已采集的?set?中,不存在则添加到队列中?
  105. ????????????????????if?(!visited.contains(request)?&&?!taskQueue.contains(request))?{?
  106. ????????????????????????try?{?
  107. ????????????????????????????taskQueue.put(request);?
  108. ????????????????????????}?catch?(InterruptedException?e)?{?
  109. ????????????????????????????e.printStackTrace();?
  110. ????????????????????????}?
  111. ????????????????????}?
  112. ????????????????}?
  113. ????????????}));?
  114. ?
  115. ????????}?catch?(Exception?e)?{?
  116. ????????????e.printStackTrace();?
  117. ????????}?
  118. ????}?
  119. ????/**?
  120. ?????*?解析页面?
  121. ?????*?
  122. ?????*?@param?url?
  123. ?????*/?
  124. ????protected?void?processPage(String?url)?{?
  125. ????????try?{?
  126. ????????????Document?doc?=?Jsoup.connect(url).get();?
  127. ????????????String?title?=?doc.select("body?>?div.hp-wrap?>?div.voice-main?>?div.artical-title?>?h1").first().ownText();?
  128. ?
  129. ????????????System.out.println(Thread.currentThread().getName()?+?"?在?"?+?new?Date()?+?"?采集了虎扑新闻?"?+?title);?
  130. ????????????//?将采集完的?url?存入到已经采集的?set?中?
  131. ????????????visited.add(url);?
  132. ?
  133. ????????}?catch?(IOException?e)?{?
  134. ????????????e.printStackTrace();?
  135. ????????}?
  136. ????}?
  137. ?
  138. ????public?static?void?main(String[]?args)?{?
  139. ?
  140. ????????try?{?
  141. ????????????new?ThreadCrawler("https://voice.hupu.com/nba",?5).run();?
  142. ????????}?catch?(InterruptedException?e)?{?
  143. ????????????e.printStackTrace();?
  144. ????????}?
  145. ????}?
  146. }?

【免责声明】本站内容转载自互联网,其相关言论仅代表作者个人观点绝非权威,不代表本站立场。如您发现内容存在版权问题,请提交相关链接至邮箱:bqsm@foxmail.com,我们将及时予以处理。

网友评论
推荐文章