【问题标题】:Java Spliterator : How to process large Stream splits equally?Java Spliterator:如何平等地处理大型 Stream 拆分?
【发布时间】:2019-11-17 09:46:13
【问题描述】:

我正在使用的代码

package com.skimmer;

import java.util.ArrayList;
import java.util.List;
import java.util.Spliterator;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.LongStream;
import java.util.stream.Stream;

public class App {

  public static void main(String[] args) throws InterruptedException, ExecutionException {

    // Simply creating some 'test' data
    Stream<String> test = LongStream.range(0, 10000000L).mapToObj(i -> i + "-test");

    Spliterator<String> spliterator = test.parallel().spliterator();
    List<Callable<Long>> callableList = new ArrayList<Callable<Long>>();

    // Creating a future for each split to process concurrently
    int totalSplits = 0;
    while ((spliterator = spliterator.trySplit()) != null) {

      totalSplits++;
      callableList.add(new Worker(spliterator, "future-" + totalSplits));
    }

    ExecutorService executor = Executors.newFixedThreadPool(totalSplits);
    List<Future<Long>> futures = executor.invokeAll(callableList);
    AtomicLong counter = new AtomicLong(0);

    for (Future<Long> future : futures)
      counter.getAndAdd(future.get());

    System.out.println("Total processed " + counter.get());
    System.out.println("Total splits " + totalSplits);

    executor.shutdown();
  }

  public static class Worker implements Callable<Long> {

    private Spliterator<String> spliterator;
    private String name;

    public Worker(Spliterator<String> spliterator, String name) {
      this.spliterator = spliterator;
      this.name = name;
    }

    @Override
    public Long call() {

      AtomicLong counter = new AtomicLong(0);
      spliterator.forEachRemaining(s -> {

        // We'll assume busy processing code here
        counter.getAndIncrement();

      });

      System.out.println(name + " Total processed : " + counter.get());

      return counter.get();
    }
  }
}

输出

furture-11 Total processed : 244
furture-10 Total processed : 488
furture-9 Total processed : 977
furture-12 Total processed : 122
furture-7 Total processed : 3906
furture-13 Total processed : 61
furture-8 Total processed : 1953
furture-6 Total processed : 7813
furture-14 Total processed : 31
furture-5 Total processed : 15625
furture-15 Total processed : 15
furture-4 Total processed : 31250
furture-17 Total processed : 4
furture-18 Total processed : 2
furture-19 Total processed : 1
furture-16 Total processed : 8
furture-3 Total processed : 62500
furture-2 Total processed : 125000
furture-1 Total processed : 250000
future-0 Total processed : 500000
Total processed 1000000
Total splits 20

我的问题/疑问: 第一个 trySplit(和未来的任务 'future-0')正好得到 n/2 个元素来开始处理。前几次拆分需要很长时间才能完成——随着 n 的增长,情况会变得更糟。有没有其他方法来处理一个流,其中每个未来/可调用对象都获得相等的元素分布来处理,例如 (N/splits) 即。 1000000/20 = 50000

期望的结果

furture-11 Total processed : 50000
furture-10 Total processed : 50000
furture-9 Total processed : 50000
furture-12 Total processed : 50000
furture-7 Total processed : 50000
furture-13 Total processed : 50000
furture-8 Total processed : 50000
furture-6 Total processed : 50000
furture-14 Total processed : 50000
furture-5 Total processed : 50000
furture-15 Total processed : 50000
furture-4 Total processed : 50000
furture-17 Total processed : 50000
furture-18 Total processed : 50000
furture-19 Total processed : 50000
furture-16 Total processed : 50000
furture-3 Total processed : 50000
furture-2 Total processed : 50000
furture-1 Total processed : 50000
future-0 Total processed : 50000
Total processed 1000000
Total splits 20

后续问题:如果 Spliterator 无法做到这一点,那么最好使用其他方法/解决方案来同时处理大型流。

实际案例场景:处理一个大的 (6GB) CSV 文件,该文件太大而无法保存在内存中

【问题讨论】:

  • 为什么要在 I/O 密集型(在这种情况下浪费了比您可以提供的线程更多的线程)或 CPU 密集型(在这种情况下比您拥有的内核更多的线程)上使用 20 个线程被浪费了)?
  • @chrylis 仅用于演示目的。在上述问题中(与现实世界不同),我知道元素的总数(1000000)。在我上面描述的实际情况下,我不知道这一点(比如来自套接字连接的 CSV 文件)。我可以让代码变得更复杂,以适当地结束线程,但不想让读者对这种复杂性感到困惑,因为那不是我的问题
  • 检查这个问题,它应该有助于管理并行性stackoverflow.com/questions/21163108/…
  • 但所涉及的资源与您的线程配置之间的不匹配可能是您的“问题”。我不认为有任何公平政策适用于执行者的工作。

标签: java java-8 java-stream future spliterator


【解决方案1】:

正在在这里得到完美平衡的分裂。问题是,每次将元素序列分成两半时,由两个 Spliterator 实例表示,您为其中一半创建一个作业,甚至不尝试进一步拆分它,而只是细分另一半。

因此,在第一次拆分之后,您创建了一个包含 500,000 个元素的作业。然后,您在其他 500,000 个元素上调用 trySplit,将其完美地拆分为两个 250,000 个元素的块,创建另一个作业覆盖 250,000 个元素的一个块,然后只尝试细分另一个。等等。是你的代码创造了不平衡的工作。

当您将第一部分更改为

// Simply creating some 'test' data
Stream<String> test = LongStream.range(0, 10000000L).mapToObj(i -> i + "-test");
// Creating a future for each split to process concurrently
List<Callable<Long>> callableList = new ArrayList<>();
int workChunkTarget = 5000;
Deque<Spliterator<String>> spliterators = new ArrayDeque<>();
spliterators.add(test.parallel().spliterator());
int totalSplits = 0;
while(!spliterators.isEmpty()) {
    Spliterator<String> spliterator = spliterators.pop();
    Spliterator<String> prefix;
    while(spliterator.estimateSize() > workChunkTarget
              && (prefix = spliterator.trySplit()) != null) {
        spliterators.push(spliterator);
        spliterator = prefix;
    }
    totalSplits++;
    callableList.add(new Worker(spliterator, "future-" + totalSplits));
}

您可以安静地接近所需的目标工作负载大小(尽可能接近,因为这些数字不是 2 的幂)。

Spliterator 设计与ForkJoinTask 之类的工具配合使用更加顺畅,在每个成功的trySplit 之后都可以提交一个新作业,并且当工作线程不工作时,作业本身将决定同时拆分和生成新作业饱和(就像在参考实现中完成的并行流操作)。

【讨论】:

    猜你喜欢
    • 2013-10-14
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-05-21
    • 2017-09-09
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多