在开始说正事之前我先给大家介绍一下这份代码的背景,以免大家有一种雾里看花的感觉。在本系列的前几篇博客中有一篇是用多线程进行百度图片的抓取,但是当时使用的多线程是非常粗略的,只是开了几个线程让抓取的速度提升了一些(其实提升了很多),初步的使用了一下线程,这篇博客将线程的使用进行了一些深入。
项目背景
博主这次的需求是抓取一些淘宝的数据,在此之前我们需要掌握基本的并行爬虫的相关知识。在这里我要先吐槽一下《自己动手写网络爬虫》这本书,不得不说,这本书让我认识到了什么叫做:有一本好书,真的会提升很多学习的效率。反正这本书不适合入门,而且非常的老,代码都不能用!!虽然其中有一些值得学习的思想,但。。。
本次代码仍有很多不完善的地方,不得不说Java网络爬虫的学习曲线还是很陡峭的,但我认为开发一个好爬虫是需要扎实的语言功底,所以学习Java爬虫还是很值得的。
代码思想与实现
对于并行爬虫而言,处理空队列要比处理序列爬虫更加复杂,空的队列并不意味着爬虫已经完成工作,因为此刻其他的进程或线程可能依然在解析网页,并且马上会产生新的URL。进程或者线程管理者需要给报告队列为空的线程发送临时的休眠信号,线程管理员需要不断追踪休眠线程的数目,只有当所有的线程都休眠的时候,爬虫才可以终止。
接下来就看一下具体的代码:
我们假设从Redis数据库中的爬虫队列里取待解析的URL。
主线程:
package multithreading;
import java.util.ArrayList;
import java.util.List;
/**
* Created by hg_yi on 17-6-13.
*
* 线程计数器是一个共享变量
*/
public class MultithreadCrawler{
public static void main(String[] args) throws InterruptedException {
//创建一个收集线程的列表
List<Thread> threadList = new ArrayList<Thread>();
//创建线程的个数
int threadNum = 5;
RunThread run = new RunThread();
run.setThreads(threadNum);
//创建5个线程,并对其进行收集
for (int i = 0; i < threadNum; i++) {
Thread thread = new Thread(run);
thread.start();
threadList.add(thread);
}
//main线程需要等待所有子线程退出
Thread.currentThread().join();
}
}
代码分析:
可以看到,我创建了5个线程。我使用了一个容器将所有创建的线程进行了收集,然后为了防止主线程提前退出而让所有子线程结束,我告知主线程需要等待每一个子线程执行完毕之后,你主线程才可以结束。
然后在for循环中我让每个子线程都执行RunThread类中的run方法,这样操作的目的主要是考虑到了线程之间数据共享的问题。
执行线程:
package multithreading;
import redisqueue.RedisQueue;
import java.util.ArrayList;
import java.util.List;
/**
* Created by hg_yi on 17-6-13.
*
* 我们只保证在给数据库中写入URL,还有改变线程线程计数器的值的时候,是需要同步的。
*
* 线程计数器threads是所有线程共享的。
*/
public class RunThread extends Thread {
//线程计数器需要对所有线程可见,是共享变量
int threads = 0;
//Redis队列的对象,也是所有对象共享的变量
RedisQueue redisQueue = new RedisQueue();
//创建线程锁
private static Object lock = new Object();
public void setThreads(int threads) {
this.threads = threads;
}
public void parseToVisitUrltoRedis() throws Exception {
//用来保存新提取出来的url列表
List<String> urlList = new ArrayList<String>();
while (true) {
//从爬虫队列中取出待抓取的url
if (!redisQueue.toVisitIsEmpty()) {
String url = redisQueue.getToVisit();
/**
* 对此url进行解析,提取出新的url列表解析出来的url顺便就写进urlList中了
* 在这个过程中不要求保证同步,每个线程都负责解析自己所属的url,解析完成
* 之后将url写入自己的urlList之中,当在解析过程中发生阻塞,则切换到其他
* 线程,保证程序的高并发性。
*/
/**
* 在此同步块中主要进行提取出来的URL的写操作,必须是同步操作,保证一个同一时间只有一个线程在对Redis数据库进行写操作。
*/
} else {
//在改变线程计数器的值的时候必须保证线程的同步性
synchronized (lock) {
threads--;
//如果仍然有其他线程在活动,则通知此线程进行等待
if (threads > 0) {
/*调用线程的wait方法会将此线程挂起,直到有其他线程调用notify\notifyAll将此线程进行唤醒*/
wait();
threads++;
} else {
//如果其他的线程都在等待,说明待抓取队列已空,则通知所有线程进行退出
notifyAll();
return;
}
}
}
}
}
public void run() {
try {
parseToVisitUrltoRedis();
} catch (Exception e) {
e.printStackTrace();
}
}
}
在run方法中我们主要实现的就是在从Redis数据库中的URL队列中提取到当前需要抓取的URL并对其进行解析,将新URL再添加到队列中。可以看到,通过引入一个线程计数器,我们解决了上述问题。