【问题标题】:Adding a stop condition to fork-join recursion为 fork-join 递归添加停止条件
【发布时间】:2014-09-04 12:23:03
【问题描述】:

为了简化我的例子,假设我正在使用 Java 的 Fork-Join 框架实现二进制搜索。我的目标是在整数数组中找到一个特定的整数值(目标整数)。这可以通过将数组分成两半来完成,直到它小到可以执行串行搜索。算法的结果需要是一个布尔值,表示是否在数组中找到目标整数。

在幻灯片 28 之后的Klaus Kreft's presentation 中探讨了类似的问题。然而,Kreft 的目标是找到数组中的最大数,因此必须扫描所有条目。就我而言,没有必要扫描整个数组,因为一旦找到目标整数,就可以停止搜索。

我的问题是,一旦我遇到目标整数,许多任务已经被插入到线程池中,我需要取消它们,因为继续搜索没有意义。我尝试从 RecursiveTask 内部调用 getPool().terminate() 但这并没有太大帮助,因为许多任务已经排队,我什至注意到即使在调用 shutdown 之后新的一次也排队..

我当前的解决方案是使用一个静态的 volatile 布尔值,该布尔值初始化为“false”,并在任务开始时检查其值。如果仍然为“假”,则任务开始工作,如果为“真”,则任务立即返回。我实际上可以为此使用 RecursiveAction。

所以我认为这个解决方案应该可以工作,但我想知道框架是否提供了一些处理此类情况的标准方法 - 即为递归定义一个停止条件,从而取消所有排队的任务。

请注意,如果我想在找到目标整数时立即停止所有正在运行的任务(通过其中一个正在运行的任务),我必须检查这些任务中每一行之后的布尔值,这可能会影响性能,因为它的值boolean 不能被缓存(它被定义为 volatile)。

确实,我认为需要一些标准解决方案,并且可以以清除队列和中断正在运行的任务的形式提供。但是我还没有找到这样的解决方案,我想知道是否有其他人知道它或有更好的想法。

感谢您的宝贵时间, 阿萨夫

编辑:这是我的测试代码:

package xxx;

import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;

public class ForkJoinTest {

    static final int ARRAY_SIZE = 1000;
    static final int THRESHOLD = 10;

    static final int MIN_VALUE = 0;
    static final int MAX_VALUE = 100;

    static Random rand = new Random();


    // a function for retrieving a random int in a specific range
    public static int randInt(int min, int max) {
        return rand.nextInt((max - min) + 1) + min;
    }

    static volatile boolean result = false;
    static int[] array = new int[ARRAY_SIZE];
    static int target;

    @SuppressWarnings("serial")
    static class MyAction extends RecursiveAction {

        int startIndex, endIndex;

        public MyAction(int startIndex, int endIndex) {
            this.startIndex = startIndex;
            this.endIndex = endIndex;
        }

        // if the target integer was not found yet: we first check whether 
        // the entries to search are too few. In that case, we perform a 
        // sequential search and update the result if the target was found. 
        // Otherwise, we break the search into two parts and invoke the 
        // search in these two tasks.
        @Override
        protected void compute() {
            if (!result) {
                if (endIndex-startIndex<THRESHOLD) { 
                    // 
                    for (int i=startIndex ; i<endIndex ; i++) {
                        if (array[i]==target) {
                            result = true;
                        }
                    }
                } else {
                    int middleIndex = (startIndex + endIndex) / 2;
                    RecursiveAction action1 = new MyAction(startIndex, middleIndex);
                    RecursiveAction action2 = new MyAction(middleIndex+1, endIndex);
                    invokeAll(Arrays.asList(action1,action2));
                }
            }
        }
    }

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        for (int i=0 ; i<ARRAY_SIZE ; i++) {
            array[i] = randInt(MIN_VALUE, MAX_VALUE);
        }
        target = randInt(MIN_VALUE, MAX_VALUE);
        ForkJoinPool pool = new ForkJoinPool();
        pool.invoke(new MyAction(0,ARRAY_SIZE));
        System.out.println(result);
    }

}

【问题讨论】:

  • 你能发布一些代码吗?您可以使用可以清除的特定队列,也可以中断正在运行的线程,但查看代码更容易为您提供正确的建议。
  • 我维护了一个开源的 fork/join 框架,它提供了一个并行顺序搜索来处理您对“查找优先”的需求。您可以按原样使用它,也可以使用代码作为示例来说明如何操作它自己。 sourceForge 链接为:sourceforge.net/projects/tymeacdse/?source=navbar
  • 谢谢@edharned,我去看看。它是否依赖于 Java 的 fork/join 框架?您是否还使用 volatile boolean / AtomicBoolean 来停止搜索?
  • @Assaf 不,它不使用 Java F/J 框架。它正确地执行 F/J。顺序并行搜索是 17 个内置函数之一。一旦找到“首先找到”,它就会使用 volatile 布尔值来停止其他线程。还有find-any、find-last、find-all。

标签: java recursion fork-join


【解决方案1】:

我认为您可能正在为正确的解决方案设置障碍。

您说您的boolean stop 标志必须是volatile,因此会影响解决方案的速度——嗯,是的,也不是——访问volatile 确实会刷新缓存,但您是否考虑过@987654325 @?

我相信正确的解决方案是使用AtomicBoolean 标志来停止所有进程。您应该检查是否合理,以使您的系统快速停止。

尝试清除所有队列并中断所有线程是错误的 - 这会导致可怕的混乱。

    static AtomicBoolean finished = new AtomicBoolean();
    ....

        protected void compute() {
            if (!finished.get()) {
                if (endIndex - startIndex < THRESHOLD) {
                    //
                    for (int i = startIndex; i < endIndex && !finished.get(); i++) {
                        if (array[i] == target) {
                            finished.set(true);
                            System.out.print("Found at " + i);
                        }
                    }
                } else {
                    ...
                }
            }
        }

【讨论】:

  • 谢谢,所以您建议切换到 AtomicBoolean 并在循环中添加对其值的检查。我应该也使用 volatile 将这种类型的检查添加到我的原始代码中。但是你能解释一下为什么在这种情况下 AtomicBoolean 比 volatile 更可取吗?在性能方面,我认为它们几乎相同,因为两者都是无锁且没有缓存的。
  • @Assaf - 几乎没有区别 - 访问 volatile 会刷新所有缓存,而访问 AtomicBoolean 应该 更少干扰 - 它并不总是更好,但它不会更糟。在您的场景中,Volatile boolean vs AtomicBoolean 几乎没有什么区别 - 我的观点是您应该采取清除队列和中断线程的另一条路线。
【解决方案2】:

我通过查看在许多内置功能中执行此操作的开源产品,在上面留下了关于如何执行此操作的评论。让我在这里说一些细节。

如果您想取消正在开始或正在执行的任务,则每个任务都需要了解其他所有任务。当一个任务找到它想要的东西时,该任务需要通知其他所有任务停止。你不能用二元递归除法(RecursiveTask 等)来做到这一点,因为你递归地创建新任务,而旧任务永远不会知道新任务。我敢肯定,您可以为每个新任务传递对停止我的字段的引用,但它会变得非常混乱,并且调试会“很有趣”。

您可以使用 Java8 CountedCompleter() 来完成此操作。为了支持这个类,框架被砍掉了,所以应该由框架完成的事情需要手动完成,但它可以工作。

每个任务都需要一个可变布尔值和一个将其设置为 true 的方法。每个任务都需要一个对所有其他任务的引用数组。预先创建所有任务,每个任务都有一个空数组,其中包含对其他任务的引用。填写对所有其他任务的引用数组。现在提交每个任务(请参阅此类的文档,fork() addPendingCount() 等)

当一个任务找到它想要的东西时,它使用对其他任务的引用数组将它们的布尔值设置为 true。如果存在多个线程的竞争条件,则无关紧要,因为所有线程都设置为“真”。您还需要处理 tryComplete()、onCompletion() 等。这个类非常混乱。它用于 Java8 流处理,这本身就是一个故事。

你不能做的是在它们开始之前从双端队列中清除待处理的任务。您需要等到任务开始并检查布尔值是否为真。如果执行时间很长,那么您可能还需要定期检查布尔值是否为真。 volatile 读取的开销并没有那么糟糕,而且确实没有其他方法。

【讨论】:

  • 再次感谢您的参考和详细的解释。我注意到 Java 8 有一些新的发展,但很难理解什么时候更喜欢什么。需要做更多的阅读。在您的描述中我不明白的一件事是为什么使用一组标志(这表明所有任务必须提前创建)而不是单个全局标志(如上面@OldCurmudgeon 所建议的 volatile 或 AtomicBoolean )。数组和单标志解决方案都将使即将开始的任务无效,如果选中标志,两者都可以在中间“停止”任务。
  • @Assaf 执行此操作的确切方式取决于您。如果您在一个对象中有一个 volatile 布尔值,那么您需要一个指向该引用的指针:pointer.isTrue();这比只检查你自己的局部变量开销更大: if (stop-me) ... 当你只检查任务的开始时,谁在乎。但是当你定期检查时,开销就很重要了。
  • docs.oracle.com/javase/8/docs/api/java/util/concurrent/… 的搜索示例显示了创建 AtomicReference/Atomic,然后将此原子传递给子任务的示例。如果结果不为空,子任务必须调用 atomic.get() 并且不执行任何操作。我不明白的是为什么你需要 CountedCompleter,因为 RecursiveAction 会做同样的事情。
  • @snaran 这个问题是三年前回答的。此后,Doug 在 CC 类中添加了许多示例。您提到的示例非常复杂,通常不是人们想要的。我不知道 RecursiveAction 是如何轻松做到这一点的,因为所有任务都没有单一的焦点。关键工作是“轻松”,任何人都可以制作一个杂乱无章的程序(就像 Doug 对 CC 示例所做的那样。)但是,如果您有示例,请向我们展示。
猜你喜欢
  • 1970-01-01
  • 2013-10-28
  • 1970-01-01
  • 2016-07-24
  • 2015-08-30
  • 1970-01-01
  • 1970-01-01
  • 2017-02-14
  • 1970-01-01
相关资源
最近更新 更多