想要更有效更快速的抓取网页内容,则必须采用多线程。Heritrix中提供了一个标准的线程池ToePool,它用于管理所有的抓取线程。 ToePool和ToeThread都位于org.archive.crawler.framework包中。前面已经说过ToePool的初始化,是在CrawlController的inITialize()方法中完成的。来看一下ToePool以及ToeThread是如何被初始化的。以下代码是在CrawlController中用于对ToePool进行初始化的。 构造函数 toePool = new ToePool(this); // 按order.xml中的配置,实例化并启动线程 toePool.setSize(order.getMaxToes()); ToePool的构造函数很简单,如下所示 public ToePool(CrawlController c) { super("ToeThreads"); this.controller = c; } 它仅仅是调用了父类java.lang.ThreadGroup的构造函数,同时,将注入的CrawlController赋给类变量。这样,便建立起了一个线程池的实例了。但是,那些真正的工作线程又是如何建立的呢? 下面来看一下线程池中的setSize(int)方法。从名称上看,这个方法很像是一个普通的赋值方法,但实际上,它并不是那么简单。 public void setSize(int newsize) { targetSize = newsize; int difference = newsize - getToeCount();
// 如果发现线程池中的实际线程数量小于应有的数量 // 则启动新的线程 if (difference > 0) { for(int i = 1; i <= difference; i++) { // 启动新线程 startNewThread(); } } // 如果线程池中的线程数量已经达到需要 else {
int retainedToes = targetSize; // 将线程池中的线程管理起来放入数组中 Thread[] toes = this.getToes();
// 循环去除多余的线程 for (int i = 0; i < toes.length ; i++) { if(!(toes[i] instanceof ToeThread)) { continue; } retainedToes--; if (retainedToes>=0) { continue; } ToeThread tt = (ToeThread)toes[i]; tt.retire(); } } }
// 用于取得所有属于当前线程池的线程 private Thread[] getToes() { Thread[] toes = new Thread[activeCount()+10]; // 由于ToePool继承自java.lang.ThreadGroup类 // 因此当调用enumerate(Thread[] toes)方法时, // 实际上是将所有该ThreadGroup中开辟的线程放入 // toes这个数组中,以备后面的管理 this.enumerate(toes); return toes; }
// 开启一个新线程 private synchronized void startNewThread() { ToeThread newThread = new ToeThread(this, nextSerialNumber++); newThread.setPriority(DEFAULT_TOE_PRIORITY); newThread.start(); } 通过上面的代码可以得出这样的结论:线程池本身在创建的时候,并没有任何活动的线程实例,只有当它的setSize方法被调用时,才有可能创建新线程;如果当setSize方法被调用多次而传入不同的参数时,线程池会根据参数里所设定的值的大小,来决定池中所管理线程数量的增减。
当线程被启动后,所执行的是其run()方法中的片段。接下来,看一个ToeThread到底是如何处理从Frontier中获得的链接的。 public void run() { String name = controller.getOrder().getCrawlOrderName(); logger.fine(getName()+" started for order '"+name+"'");
try { while ( true ) { // 检查是否应该继续处理 continueCheck(); setStep(STEP_ABOUT_TO_GET_URI); // 使用Frontier的next方法从Frontier中 // 取出下一个要处理的链接 CrawlURI curi = controller.getFrontier().next(); // 同步当前线程 synchronized(this) { continueCheck(); setCurrentCuri(curi); }
/* * 处理取出的链接 */ processCrawlUri(); setStep(STEP_ABOUT_TO_RETURN_URI); // 检查是否应该继续处理 continueCheck(); // 使用Frontier的finished()方法 // 来对刚才处理的链接做收尾工作 // 比如将分析得到的新的链接加入 // 到等待队列中去 synchronized(this) { controller.getFrontier().finished(currentCuri); setCurrentCuri(null); }
// 后续的处理 setStep(STEP_FINISHING_PROCESS); lastFinishTime = System.currentTimeMillis(); // 释放链接 controller.releaseContinuePermission(); if(shouldRetire) { break; // from while(true) } } } catch (EndedException e) { } catch (Exception e) { logger.log(Level.SEVERE,"Fatal exception in "+getName(),e); } catch (OutOfMemoryError err) { seriousError(err); } finally { controller.releaseContinuePermission(); } setCurrentCuri(null);
// 清理缓存数据 this.httpRecorder.closeRecorders(); this.httpRecorder = null; localProcessors = null;
logger.fine(getName()+" finished for order '"+name+"'"); setStep(STEP_FINISHED); controller.toeEnded(); controller = null; } 在上面的方法中,很清楚的显示了工作线程是如何从Frontier中取得下一个待处理的链接,然后对链接进行处理,并调用Frontier的finished方法来收尾、释放链接,最后清理缓存、终止单步工作等。另外,其中还有一些日志操作,主要是为了记录每次抓取的各种状态。 很显然,以上代码中,最重要的一行语句processCrawlUri(),它是真正调用处理链来对链接进行处理的代码。 |