【问题标题】:Is there a way to put a CompletableFuture in a loop?有没有办法将 CompletableFuture 放入循环中?
【发布时间】:2021-02-04 18:05:59
【问题描述】:

下面代码的问题是我必须等待所有三个任务都完成。

如果第一个和第二个任务在 200 毫秒内完成,而第三个任务在 2 秒内完成,那么我将不得不等待 2 秒才能加载接下来的三个 URL。

理想情况下,我会在每个任务完成后立即发送一个新请求,并以某种方式延迟主线程,直到 ArrayList 为空。

简单来说,我希望每个可完成的未来都在一种由旧任务完成触发的循环中运行。

(我经常在 JavaScript 中使用事件这样做)

谁能想到我如何做到这一点?

    private static void httpClientExample(){

    ArrayList<String> urls = new ArrayList<>(
            Arrays.asList(
                    "https://www.bing.com/",
                    "https://openjdk.java.net/",
                    "https://openjdk.java.net/",
                    "https://google.com/",
                    "https://github.com/",
                    "https://stackoverflow.com/"
            ));

    HttpClient httpClient = HttpClient.newHttpClient();

    var task1 = httpClient.sendAsync(HttpRequest.newBuilder()
            .uri(URI.create(urls.get(0)))
            .build(), HttpResponse.BodyHandlers.ofString())
            .thenApply(HttpResponse::uri).thenAccept(System.out::println);

    var task2 = httpClient.sendAsync(HttpRequest.newBuilder()
            .uri(URI.create(urls.get(1)))
            .build(), HttpResponse.BodyHandlers.ofString())
            .thenApply(HttpResponse::uri).thenAccept(System.out::println);

    var task3 = httpClient.sendAsync(HttpRequest.newBuilder()
            .uri(URI.create(urls.get(2)))
            .build(), HttpResponse.BodyHandlers.ofString())
            .thenApply(HttpResponse::uri).thenAccept(System.out::println);

    // All tasks have to complete
    var all = CompletableFuture.allOf(task1, task2, task3).join();
    
    // Get the next 3 URLs

    System.out.println("Main Thread Completed");
}

【问题讨论】:

  • 您是否有任何理由要等待所有 3 个请求完成后再继续?如果唯一的要求是最多发出 3 个并行请求,您可以使用最多 3 个线程的 ExecutorService
  • @dpr 完全没有理由。我只是想确保在主线程退出之前完成所有三个任务。
  • 您是指所有 6 个(URL 数量)任务?
  • 我更新了我的答案。没有“最多 3 个并行调用”的要求,它变得更简单了。

标签: java java-11 java-http-client


【解决方案1】:

让作业本身删除另一个待处理的 URL 并提交它,需要一个线程安全队列。

让主线程来做可能更容易,例如喜欢

var httpClient = HttpClient.newHttpClient();
var pending = new ArrayDeque<CompletableFuture<?>>(3);
for(String url: urls) {
    while(pending.size() >= 3 && !pending.removeIf(CompletableFuture::isDone))
        CompletableFuture.anyOf(pending.toArray(CompletableFuture<?>[]::new)).join();

    pending.addLast(httpClient.sendAsync(HttpRequest.newBuilder()
            .uri(URI.create(url))
            .build(), HttpResponse.BodyHandlers.ofString())
            .thenApply(HttpResponse::uri).thenAccept(System.out::println));
}
CompletableFuture.allOf(pending.toArray(CompletableFuture<?>[]::new)).join();

这将等到三个提交的作业中的至少一个完成(使用anyOf/join),然后再提交下一个。当循环结束时,可能有多达三个仍在运行的作业。循环之后的后续allOf/join会等待那些作业的完成,所以之后所有的作业都已经完成了。当您希望启动器线程在已知所有作业已提交时继续执行,而无需等待它们完成,只需删除最后一条语句即可。

【讨论】:

  • 非常感谢您提供的出色解决方案。我已经对其进行了测试,并且效果很好。我喜欢您的解决方案如何删减所有已完成的任务。尽管 ArrayDeque 可能会在任务完成(http 请求返回)之前填满并溢出,但这似乎是可能的。我正在尝试构建一个可以处理超过 1,000,000 个网址的爬虫。您能想出一个简单的方法来修改您的解决方案,以便将数据分块,以便我可以在下一个块开始之前将其存储在某个地方。一次说 100 个网址?
  • 队列不会溢出,因为循环会等待至少一个作业完成,然后再放入另一个作业。这就是anyOf/joinwhile(pending.size() &gt;= 3 … 循环中所做的事情。如果您想一次允许更多,您只需要调整数字。 new ArrayDeque&lt;CompletableFuture&lt;?&gt;&gt;(3) 中的另一个3 只是一个优化,将初始容量与使用情况相匹配。不过,对于生产代码,值得将两个数字融合到一个命名常量(或参数)中。
  • 对不起,我错过了。一个非常优雅的解决方案。非常感谢您的帮助。亲切的问候。
【解决方案2】:

如果您对最大并行调用数量没有要求,事情会变得容易得多:

private static void httpClientExample() throws Exception {

  final ArrayList<String> urls = ...; //list of urls 

  final HttpClient httpClient = HttpClient.newBuilder().executor(
                                    Executors.newFixedThreadPool(10)).build();

  final List<CompletableFuture<Void>> allFutures = new ArrayList<>();
  for (String url : urls) {
    final CompletableFuture<Void> completableFuture = httpClient
        .sendAsync(HttpRequest.newBuilder().uri(URI.create(url)).build(),
            HttpResponse.BodyHandlers.ofString())
        .thenApply(HttpResponse::uri).thenAccept(System.out::println);
    allFutures.add(completableFuture);
  }

  CompletableFuture.allOf(allFutures.toArray(CompletableFuture[]::new)).get();
}

【讨论】:

  • 当您使用executorService = Executors.newFixedThreadPool(3); 时,使用executorService.invokeAll(…) 可能更容易,因为Callable 接口允许检查异常。
  • 是的,在 Java 中 lambda 的日常使用中,检查异常的处理是一件可悲的事情。使用ExecutorService.submitinvokeAll 我只会得到List&lt;Future&gt;,这些组合起来非常笨重......
  • 是的,但是在这种特定情况下,您不需要将它们组合起来,因为没有使用结果。 OP 只使用期货等待完成,但这已经由invokeAll 自己完成。
  • 确实,我认为invokeAll 只会提交可调用列表,而不是等待它们完成。
  • @AdrianSmith 哇,从 6 到 1,000,000!不,如果您认为 url 的数量可能会变得非常大(> 1k),我认为将所有期货放入一个数据结构中并不是一个好主意。我什至不会把所有的网址放在一个数组中......
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2023-01-26
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多