【问题标题】:How do I create a parallel spliterator from multiple iterators?如何从多个迭代器创建并行拆分器?
【发布时间】:2020-05-28 18:10:36
【问题描述】:

我有一个游标支持的迭代器列表,这些迭代器由执行多个数据库查询的方法返回。

由于每个由游标支持的迭代器在数据库上都是这样分区的,所以我也想在结果流上利用并行流操作。为此,我想创建一个拆分器。

List<Iterator<Article>> articleIterators = executeQuery();

我将如何创建拆分器?假设每个迭代器中的项目都是有序的。我的想法可能存在根本性的缺陷。


更多背景知识:我运行多个 Cassandra 查询,因此每个查询都会访问不同的分区。每个查询都返回自己的惰性分页迭代器。

【问题讨论】:

  • 如果我可以尝试(过度)简化问题,考虑到并行处理这些数据集的好处已经确立,我会做一些简单而可靠的事情来轻松实现这一点(就像一个大小正确的执行器服务为每个迭代器执行一个流任务)。 IMO,试图通过单个并行流管道运行它需要一些工程才能提出一个拆分迭代器,该迭代器将完全按照需要以一种确定的方式对集合进行分区。
  • @ernest_k 实际上,组合流(concatflatMap)倾向于在其组件处精确拆分,这些组件就是这些迭代器。更大的问题是并行流会根据本地CPU核数来配置,而不是数据库分区数。
  • @Holger 太好了,这为 OP 的问题提供了一个简单的答案(除非这取决于实施 - tl;dr)
  • @ernest_k 我更喜欢你讲述适合这项工作的工具的方法。

标签: java java-8 iterator java-stream


【解决方案1】:

你可以试试abacus-util中提供的Stream

List<Stream<Integer>> iters = IntStream.range(1, 32).mapToObj(it -> Stream.repeat(1, it).peek(Fn.sleep(3))).toList();

long startTime = System.currentTimeMillis();
Stream.concat(iters).sumInt(Integer::intValue);
System.out.println("Took: " + (System.currentTimeMillis() - startTime)); // print out: Took: 1535

iters = IntStream.range(1, 32).mapToObj(it -> Stream.repeat(1, it).peek(Fn.sleep(3))).toList();

startTime = System.currentTimeMillis();
Stream.parallelConcat(iters).sumInt(Integer::intValue);
System.out.println("Took: " + (System.currentTimeMillis() - startTime)); // print out: Took: 251

免责声明:我是abacus-util的开发者。

【讨论】:

    猜你喜欢
    • 2020-03-27
    • 1970-01-01
    • 2016-07-24
    • 2016-07-13
    • 2018-05-20
    • 1970-01-01
    • 1970-01-01
    • 2014-04-20
    • 2015-09-13
    相关资源
    最近更新 更多