【问题标题】:ExecutorService never stops. When execute new task inside another executing taskExecutorService 永远不会停止。在另一个正在执行的任务中执行新任务时
【发布时间】:2016-08-12 10:13:11
【问题描述】:

美好的一天。

我的网络爬虫项目有拦截器问题。 逻辑很简单。首先创建一个Runnable,它下载html文档,扫描所有链接,然后在所有资助的链接上创建新的Runnable对象。每个新创建的Runnable 依次为每个链接创建新的Runnable 对象并执行它们。

问题是ExecutorService 永远不会停止。

CrawlerTest.java

public class CrawlerTest {

    public static void main(String[] args) throws InterruptedException {
        new CrawlerService().crawlInternetResource("https://jsoup.org/");
    }
}

CrawlerService.java

import java.io.IOException;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import org.jsoup.Jsoup;
import org.jsoup.nodes.Document;
import org.jsoup.nodes.Element;
import org.jsoup.select.Elements;

public class CrawlerService {

    private Set<String> uniqueUrls = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>(10000));
    private ExecutorService executorService = Executors.newFixedThreadPool(8);
    private String baseDomainUrl;

    public void crawlInternetResource(String baseDomainUrl) throws InterruptedException {
        this.baseDomainUrl = baseDomainUrl;
        System.out.println("Start");
        executorService.execute(new Crawler(baseDomainUrl)); //Run first thread and scan main domain page. This thread produce new threads.
        executorService.awaitTermination(10, TimeUnit.MINUTES);
        System.out.println("End");
    }

    private class Crawler implements Runnable { // Inner class that encapsulates thread and scan for links

        private String urlToCrawl;

        public Crawler(String urlToCrawl) {
            this.urlToCrawl = urlToCrawl;
        }

        public void run() {
            try {
                findAllLinks();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        private void findAllLinks() throws InterruptedException {
            /*Try to add new url in collection, if url is unique adds it to collection, 
             * scan document and start new thread for finded links*/
            if (uniqueUrls.add(urlToCrawl)) { 
                System.out.println(urlToCrawl);

                Document htmlDocument = loadHtmlDocument(urlToCrawl);
                Elements findedLinks = htmlDocument.select("a[href]");

                for (Element link : findedLinks) {
                    String absLink = link.attr("abs:href");
                    if (absLink.contains(baseDomainUrl) && !absLink.contains("#")) { //Check that we are don't go out of domain
                        executorService.execute(new Crawler(absLink)); //Start new thread for each funded link
                    }
                }
            }
        }

        private Document loadHtmlDocument(String internetResourceUrl) {
            Document document = null;
            try {
                document = Jsoup.connect(internetResourceUrl).ignoreHttpErrors(true).ignoreContentType(true)
                        .userAgent("Mozilla/5.0 (Windows NT 6.1; WOW64; rv:48.0) Gecko/20100101 Firefox/48.0")
                        .timeout(10000).get();
            } catch (IOException e) {
                System.out.println("Page load error");
                e.printStackTrace();
            }
            return document;
        }
    }
}

此应用需要大约 20 秒来扫描 jsoup.org 以查找所有唯一链接。但它只需等待 10 分钟executorService.awaitTermination(10, TimeUnit.MINUTES); 然后我看到死的主线程并且仍在工作的执行程序。

Threads

如何强制ExecutorService 正常工作?

我认为问题在于它在另一个任务中而不是在主线程中调用 executorService.execute。

【问题讨论】:

  • 句柄 executorService 在 try catch 并在 finally 块中写入 executorService.shutdown();Reference
  • @Imran 不起作用。它仍然等待 10 分钟,直到主线程死亡。我认为问题在于它在另一个任务而不是主线程中调用 executorService.execute。

标签: java multithreading jsoup executorservice executors


【解决方案1】:

您误用了awaitTermination。根据javadoc,您应该先致电shutdown

在关闭请求后阻塞,直到所有任务完成执行,或发生超时,或当前线程被中断,以先发生者为准。

为了实现您的目标,我建议使用CountDownLatch(或支持this one 之类的增量的闩锁)来确定没有任务剩余的确切时刻,以便您可以安全地执行shutdown

【讨论】:

  • 我无法使用 CountDownLatch,因为我事先不知道我将从资源中收集多少个唯一链接。
  • 如果调用 executorService.shutdown();在 executorService.awaitTermination(10, TimeUnit.MINUTES) 之前;它只等待第一个线程,爬虫只收集第一个链接jsoup.org。我认为问题在于它在另一个任务而不是主线程中调用 executorService.execute。
  • @Redeemer 您可以制作一个支持向上计数的闩锁(如我答案中的链接)。然后您将从计数 1 开始。对于每个找到的链接(尚未处理),您将在爬虫完成处理后将其递增 1 并递减。在crawlInternetResource 中,您将等到锁存计数为零。
  • 闩锁无济于事。如果任务可以创建新任务,那么这可能不会很好。 @Redeemer您需要设置“最大深度”。这是互联网。不确定它的瞬时宽度,但它可能很深。
  • @Fildor well...限制深度你仍然可以使用闩锁:D
【解决方案2】:

我看到了你之前的评论:

我不能使用 CountDownLatch,因为我事先不知道我将从资源中收集多少个唯一链接。

首先,vsminkov 回答了为什么awaitTermniation 会坐等 10 分钟。我将提供一个替代解决方案。

不要使用CountDownLatch,而是使用Phaser。对于每个新任务,您都可以注册并等待完成。

每次调用execute.submit 时创建一个单独的移相器和register,并在每次Runnable 完成时创建arrive

public void crawlInternetResource(String baseDomainUrl) {
    this.baseDomainUrl = baseDomainUrl;

    Phaser phaser = new Phaser();
    executorService.execute(new Crawler(phaser, baseDomainUrl)); 
    int phase = phaser.getPhase();
    phase.awaitAdvance(phase);
}

private class Crawler implements Runnable { 

    private final Phaser phaser;
    private String urlToCrawl;

    public Crawler(Phaser phaser, String urlToCrawl) {
        this.urlToCrawl = urlToCrawl;
        this.phaser = phaser;
        phaser.register(); // register new task
    }

    public void run(){
       ...
       phaser.arrive(); //may want to surround this in try/finally
    }

【讨论】:

    【解决方案3】:

    你不是在叫关机。

    这可能有效 - CrawlerService 中的 AtomicLong 变量。在每个新的子任务提交给执行器服务之前递增。

    修改你的run()方法来减少这个计数器,如果为0,关闭执行器服务

    public void run() {
        try {
            findAllLinks();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            //decrements counter
            //If 0, shutdown executor from here or just notify CrawlerService who would be doing wait().
        }
    }
    

    在“finally”中,减少计数器,当计数器为零时,关闭 executor 或只通知 CrawlerService。 0 表示,这是最后一个,没有其他正在运行,队列中没有待处理。没有任务会提交任何新的子任务。

    【讨论】:

      【解决方案4】:

      如何强制ExecutorService正常工作?

      我认为问题在于它在另一个任务中而不是在主线程中调用 executorService.execute。

      没有。问题不在于 ExecutorService。您以不正确的方式使用 API,因此没有得到正确的结果。

      您必须按一定顺序使用三个 API 才能获得正确的结果。

      1. shutdown
      2. awaitTermination
      3. shutdownNow
      

      来自ExecutorService的oracle文档页面的推荐方式:

       void shutdownAndAwaitTermination(ExecutorService pool) {
         pool.shutdown(); // Disable new tasks from being submitted
         try {
           // Wait a while for existing tasks to terminate
           if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
             pool.shutdownNow(); // Cancel currently executing tasks
             // Wait a while for tasks to respond to being cancelled
             if (!pool.awaitTermination(60, TimeUnit.SECONDS))
                 System.err.println("Pool did not terminate");
           }
         } catch (InterruptedException ie) {
           // (Re-)Cancel if current thread also interrupted
           pool.shutdownNow();
           // Preserve interrupt status
           Thread.currentThread().interrupt();
         }
      

      shutdown(): 启动有序关闭,执行之前提交的任务,但不会接受新任务。

      shutdownNow():尝试停止所有正在执行的任务,停止等待任务的处理,并返回等待执行的任务列表。

      awaitTermination():在关闭请求后阻塞,直到所有任务都完成执行,或者发生超时,或者当前线程被中断,以先发生者为准。

      另外说明:如果您想等待所有任务完成,请参考这个相关的 SE 问题:

      wait until all threads finish their work in java

      我更喜欢使用invokeAll()ForkJoinPool(),它们最适合您的用例。

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多