【问题标题】:Using streams to find an object in a list of lists [duplicate]使用流在列表列表中查找对象[重复]
【发布时间】:2015-11-21 02:26:33
【问题描述】:

我正在尝试编写一种方法,该方法在列表列表中查找对象的索引并利用并行性。这是我的代码。

// returns [i, j] where lists.get(i).get(j) equals o, or null if o is not present.
public static int[] indices(List<? extends List<?>> lists, Object o) {
    return IntStream.range(0, lists.size())
                    .boxed()
                    .flatMap(i -> IntStream.range(0, lists.get(i).size()).mapToObj(j -> new int[]{i, j}))
                    .parallel()
                    .filter(a -> {
                        System.out.println(Arrays.toString(a));     // For testing only
                        return Objects.equals(o, lists.get(a[0]).get(a[1]));
                    })
                    .findAny()
                    .orElse(null);
}

当我运行以下代码时

List<List<String>> lists = Arrays.asList(
        Arrays.asList("A", "B", "C"),
        Arrays.asList("D", "E", "F", "G"),
        Arrays.asList("H", "I"),
        Collections.nCopies(5, "J")
);
System.out.println("Indices are " + Arrays.toString(indices(lists, "J")));

输出类似于

[0, 0]
[0, 1]
[0, 2]
[3, 0]
[3, 1]
[3, 2]
[3, 3]
[2, 0]
[3, 4]
[1, 0]
[1, 1]
[2, 1]
[1, 2]
[1, 3]
Indices are [3, 0]

换句话说,即使在找到对象之后,搜索也会继续。 findAny 不应该是短路操作吗?我错过了什么?另外,在迭代列表列表或锯齿状数组时,利用并行性的最佳方法是什么?

编辑

按照@Sotirios 的回答中的想法,我得到了输出

Thread[ForkJoinPool.commonPool-worker-3,5,main] [3, 0]
Thread[main,5,main] [2, 0]
Thread[main,5,main] [2, 1]
Thread[ForkJoinPool.commonPool-worker-1,5,main] [1, 0]
Thread[ForkJoinPool.commonPool-worker-1,5,main] [1, 1]
Thread[ForkJoinPool.commonPool-worker-1,5,main] [1, 2]
Thread[ForkJoinPool.commonPool-worker-1,5,main] [1, 3]
Thread[main,5,main] [0, 0]
Thread[main,5,main] [0, 1]
Thread[ForkJoinPool.commonPool-worker-3,5,main] [3, 1]
Thread[main,5,main] [0, 2]
Thread[ForkJoinPool.commonPool-worker-3,5,main] [3, 2]
Thread[ForkJoinPool.commonPool-worker-3,5,main] [3, 3]
Thread[ForkJoinPool.commonPool-worker-3,5,main] [3, 4]
Indices are [3, 0]

注意

Thread[ForkJoinPool.commonPool-worker-3,5,main]

即使在找到答案后仍继续搜索。

【问题讨论】:

  • 使用 findFirst() 代替。
  • @TaharBakir 它仍在继续搜索。
  • 另外,并行性可能需要一些时间才能让一个线程通知其他线程他们不需要继续运行。
  • @LouisWasserman 当我按照 Sotirios 的回答打印线程时,我发现即使在同一个线程中,在找到对象后搜索也会继续。

标签: java java-8 java-stream


【解决方案1】:

短路操作并不能保证只提取尽可能少的元素来产生结果。他们可以这样做,但这不是必需的。

flatMap 的当前实现是这样的,它总是将子流的全部内容推送到下游。因此,即使您的流不是并行的,您也可以看到流过流的元素比满足findAny 所需的要多。

【讨论】:

  • 看来这个答案是对的,flatMap().filter().findAny() 基本没有短路。不知道为什么会这样实现。
  • “short-cuircuiting”仅仅意味着它可能在检查整个流之前终止。除此之外,它不做任何保证。
【解决方案2】:

至于“为什么以这种方式实施”。问题的根源在于 Stream API 实现。 flatMap 主体通常会创建带有一些中间操作的流(如 .flatMap(list -&gt; list.stream().map(...).filter(...)))。可以在flatMap 实现中使用stream.spliterator() 并多次调用tryAdvance,直到请求取消。但是,当流包含中间操作时,spliterator() 调用会返回一些人为的拆分器(如果没有,它只会返回原始的流拆分器)。这个人工分离器没有非常有效的tryAdvance() 实现,因此与使用整个 flatMapped 流相比,使用这种实现可能被认为是更糟糕的性能缺陷。在许多情况下,您将平面映射到一些短流,因此由于当前的实现,您可能会在此处获得性能提升。

【讨论】:

    【解决方案3】:

    不是继续,而是已经派出各种线程去尝试寻找结果,等这些线程完成后才返回结果。

    换句话说,findAny 终端操作会将“搜索”任务提交给多个线程。这些任务只是应用filterPredicate 并在返回true 时返回。 findAny 大概会等待其中一个返回值。它没有办法真正取消它已经提交的任何东西,而且似乎这个实现会阻塞,直到整个批次返回。它只能停止提交任何未来的批次。

    您可以通过记录当前线程来验证这一点:

    System.out.println(Thread.currentThread() + " " + Arrays.toString(a)); // For testing only
    

    【讨论】:

    • 我半睡半醒,所以这可能是一个愚蠢的问题,但是如果预先为大量工作线程分配了任务,并且整个方法在它们全部完成之前无法返回,那么短-电路甚至是什么意思?
    • @PaulBoddington 我不认为这是全部,我认为这是一个子集。
    • @PaulBoddington 例如,我启动了 5 个线程进行搜索。所有 5 个都可能返回一个结果。但我必须等到所有 5 个才能决定。 (嗯,你真的只需要等待一个,但你不能取消其他的。这个实现似乎想要加入所有这 5 个任务。)
    • @PaulBoddington 我认为这是 flatMap 的结果,正如 Misha 所说。我得跑上我的公共汽车,brb。