【问题标题】:Each Java Future Controls a Core for Its Lifetime每个 Java 未来都在其生命周期内控制一个核心
【发布时间】:2026-01-06 15:40:01
【问题描述】:

我正在使用 web3j 库为以太坊区块链开发 Java 负载测试应用程序。此库中使用的特定函数异步发送事务,并返回 Future 对象。该库提供了许多功能,这些功能最初满足了我对应用程序开发的所有需求。但是,在设计和重新设计负载测试应用程序之后,我注意到了一个问题。我一次只能部署 N 个 Future,其中 N 是我机器上可用内核的数量。

这是非常低效的,因为每笔交易至少需要 10 秒才能集成到区块链中,并且 Futures 会在后台持续运行。如果将它们转换为 RxJava 可观察对象(使用 RxJava 的 from() 函数),它们是否仍会在后台持续运行以检查其数据是否可用,或者我是否能够一次覆盖许多检查。例如,如果检查耗时 0.1 秒,并且我需要每秒检查一次未来,那么我可以在一个核心上运行 10 次检查而不是 1 次。如果将 Future 转换为 Observable 仍会表现出持续核心使用行为,有没有不涉及重构大量 web3j 内部代码的不同方法?

罪魁祸首可能是这个函数,它包含在Async.java 文件中。此函数由内部 web3j 函数调用以发送事务。

public static <T> CompletableFuture<T> run(Callable<T> callable) {
    CompletableFuture<T> result = new CompletableFuture<>();
    CompletableFuture.runAsync(() -> {
        // we need to explicityly catch any exceptions,
        // otherwise they will be silently discarded
        try {
            result.complete(callable.call());
        } catch (Throwable e) {
            result.completeExceptionally(e);
        }
    });
    return result;
}

【问题讨论】:

  • "我一次只能部署 N 个 Future,其中 N 是我机器上可用内核的数量。" - 你到底是什么意思?当您尝试“部署”第 N+1 个未来时,究竟会发生什么?我认为有一个误解需要澄清。
  • 当我部署第 N+1 个 Future 时,它​​不会执行,直到当前正在执行的 Future 之一完成。
  • 我们需要查看Minimal, Complete, and Verifiable example 的代码来证明这一点,然后才能提供更多帮助。我的直觉说这与您的 ExecutorService 中的线程数有关,但没有代码我们无法给您正确的建议。
  • 这是web3j函数在发送交易时调用的,可能是罪魁祸首: public static CompletableFuture run(Callable callable) { CompletableFuture result = new CompletableFuture (); CompletableFuture.runAsync(() -> { // 我们需要明确地捕获任何异常, // 否则它们将被静默丢弃 try { result.complete(callable.call()); } catch (Throwable e) { result.completeExceptionally (e);}});返回结果; }
  • 请不要将代码放在评论中,因为我看不懂。请改为编辑您的问题以包含它。

标签: java concurrency rx-java future


【解决方案1】:

首先,如果你希望你的任务返回一个结果,你应该使用supplyAsync而不是runAsync

但是对于您问题的更实质性部分,当您调用 runAsyncsupplyAsync 并且只运行要运行的任务时,它将在 JVM 范围内运行 ForkJoinPool,正如您所猜测的那样许多线程可供它使用,就像您机器上的内核一样。但是,您也可以提供您选择的 ExecutorService,它的线程数可能多于内核数。

ExecutorService executor = Executors.newFixedThreadPool(NUM_DESIRED_THREADS);

CompletableFuture<T> result = CompletableFuture.supplyAsync(callable, executor);

以上代码将允许多达NUM_DESIRED_THREADS 个任务同时运行。

【讨论】:

    最近更新 更多