【问题标题】:Best way to support lazy/late parallel in customised stream method in Java 8在 Java 8 的自定义流方法中支持延迟/延迟并行的最佳方法
【发布时间】:2017-10-13 13:28:59
【问题描述】:

Java 实现使用 ReferencePipeline 类,它通过设计支持惰性/延迟 .parallel()。这意味着这些每组代码行完全相同:

// sequential identical lines:
stream.map(mapper).distict().filter(filter).sequential()...
stream.sequential().map(mapper).distict().filter(filter)...

// parallel identical lines:
stream.map(mapper).distict().filter(filter).parallel()...
stream.parallel().map(mapper).distict().filter(filter)...

假设我想使用以下签名构建一个新的自定义流方法:
public <T> static Stream<T> myMethod(Stream<T> stream)

并且作为要求,该方法应该存在相同的 .parallel() 惰性/延迟行为。这意味着每组行应该具有完全相同的行为:

// sequential identical lines:
myMethod(stream).sequential()...
myMethod(stream.sequential())...

// parallel identical lines:
myMethod(stream).parallel()...
myMethod(stream.parallel())...

我该怎么做?一个简单的例子会很有用。

public <T> static Stream<T> myMethod(Stream<T> stream) {
    // Any implementation that changes the stream
    // For simplification lets assume that this extensions
    // switches the odd with even positions, or anything else
    // that it's simpler and easy to demonstrate

    // This prevents parallel to be lazy/late!
    Spliterator<T> spliterator = stream.spliterator();
    return StreamSupport.stream(new Spliterator<T>() {
        // My easy implementation
    }, stream.isParallel());
}

请注意,.spliterator()StreamSupport.stream() 的使用会影响此处讨论的并行处理:Understanding sequential vs parallel stream spliterators in Java 8 and Java 9

更新:使用 assertj 的 junit5 测试:

@ParameterizedTest(name="[{index}] {0}/{1}/{2} = {3} --> {4}")
@CsvSource({
        "-,-,-,1!,Sequential: default behaviour",
        "P,-,-,2+,Parallel: set in stage1",
        "-,P,-,2+,Parallel: set in stage2",
        "-,-,P,2+,Parallel: set in stage3",
        "P,S,-,1!,Sequential: set in stage2",
        "P,-,S,1!,Sequential: set in stage3",
        "P,S,P,2+,Parallel: set last in stage3",
        "S,P,S,1!,Sequential: set last in stage3",
})
void myMethodTest(String stage1f, String stage2f, String stage3f, String expThreads, String name) throws Exception {
    Set<String> set1 = new ConcurrentSkipListSet<>();
    Set<String> set2 = new ConcurrentSkipListSet<>();
    Set<String> set3 = new ConcurrentSkipListSet<>();
    int parallelism = 4;
    int minExpected = expThreads.equals("1!") ? 1 : 2;
    int maxExpected = expThreads.equals("1!") ? 1 : parallelism;

    BiFunction<String, Stream<Long>, Stream<Long>> mode = (flag, stream) -> {
        switch (flag) {
            case "P": return stream.parallel();
            case "S": return stream.sequential();
            default: return stream;
        }
    };

    Stream<Long> stage1 = mode.apply(stage1f, LongStream.range(0, 1000_000).boxed())
            .peek(x -> set1.add(Thread.currentThread().getName()));

    Stream<Long> stage2 = mode.apply(stage2f, myMethod(stage1).map(x -> 2*x))
            .peek(x -> set2.add(Thread.currentThread().getName()));

    Stream<Long> stage3 = mode.apply(stage3f, myMethod(stage2).map(x -> 2*x))
            .peek(x -> set3.add(Thread.currentThread().getName()));

    new ForkJoinPool(parallelism).submit(() -> {
        List<Long> list = stage3.collect(Collectors.toList());
        System.out.print("list:" + list.size() + "  threads:" + parallelism + "  flags:" + stage1f + "/" + stage2f + "/" + stage3f + "  ");
    }).get();

    System.out.print("stage1:" + set1.size() + "/" + maxExpected + "  ");
    System.out.print("stage2:" + set2.size() + "/" + maxExpected + "  ");
    System.out.println("stage3:" + set3.size() + "/" + maxExpected + "  ");
    assertThat(set1.size()).isBetween(minExpected, maxExpected);
    assertThat(set2.size()).isBetween(minExpected, maxExpected);
    assertThat(set3.size()).isBetween(minExpected, maxExpected);
}

【问题讨论】:

  • 你想懒惰地并行评估(自定义)流吗?
  • 不知道lazily这个表达式是否是最好用的,但我想要的只是具有与原生流相同的行为,您可以随时添加.parallel(),一切都会并行完成,而不仅仅是一个部分。 StreamSupport.stream() 的用法似乎打破了它。
  • 您想要在接口 java.util.stream.BaseStream 中声明的方法 .parallel() 的功能 - 用于您的自定义流?
  • 也许您可以将源流包装在自定义抽象拆分器中,如果 trySplit 被调用或在 forEachRemaining 被调用时将源转换为并行流。
  • 谢谢@the8472。有趣的建议。不过.characteristics()总是先调用,根据特性,.estimatedSize()也可以第二调用。这意味着源拆分器需要在到达trySplit()forEachRemaining() 甚至.tryAdvance() 之前从包装源流中实现。

标签: java java-8 java-stream spliterator


【解决方案1】:

这很棘手,以下解决方案可能不适用于所有情况:

public static <T> Stream<T> myMethod(Stream<T> stream) {
    // for simplification, this does not alter anything from the source stream
    class CustomSpliterator implements Spliterator<T> {
        Spliterator<T> source;
        CustomSpliterator(Spliterator<T> src) {
            source = src;
        }
        @Override public boolean tryAdvance(Consumer<? super T> action) {
            return source.tryAdvance(action);
        }
        @Override public void forEachRemaining(Consumer<? super T> action) {
            source.forEachRemaining(action);
        }
        @Override public Spliterator<T> trySplit() {
            Spliterator<T> sSp = source.trySplit();
            return sSp == null? null: new CustomSpliterator(sSp);
        }
        @Override public long estimateSize() {
            return source.estimateSize();
        }
        @Override public int characteristics() {
            return source.characteristics();
        }
    }
    class MySpSupplier implements Supplier<Spliterator<T>> {
        Stream<T> downStream;
        @Override public Spliterator<T> get() {
            System.out.println("MySpSupplier.get(): nailing down behavior");
            Stream<T> src=downStream.isParallel()? stream.parallel(): stream.sequential();
            return new CustomSpliterator(src.spliterator());
        }
    }
    MySpSupplier sup = new MySpSupplier();
    Stream<T> s = StreamSupport.stream(sup, Spliterator.ORDERED, stream.isParallel());
    sup.downStream = s;
    return s;
}

但它实现了目标,例如

Stream<String> s = Stream.of("foo", "bar", "baz");
System.out.println("started with "+(s.isParallel()? "parallel": "sequential")+" stream");
s = s.peek(x -> System.out.println("upstream "+x+": "+Thread.currentThread()));
s = myMethod(s.map(String::toUpperCase));
System.out.println("chaining another ops");
s = s.peek(x -> System.out.println("downstream "+x+": "+Thread.currentThread()));
s = s.filter(str -> str.startsWith("B"));
System.out.println("turning to parallel");
s = s.parallel();
System.out.println("commencing terminal operation");
List<String> l = s.collect(Collectors.toList());
System.out.println("result: "+l);
started with sequential stream
chaining another ops
turning to parallel
commencing terminal operation
MySpSupplier.get(): nailing down behavior
upstream bar: Thread[main,5,main]
upstream foo: Thread[ForkJoinPool.commonPool-worker-1,5,main]
downstream BAR: Thread[main,5,main]
upstream baz: Thread[ForkJoinPool.commonPool-worker-2,5,main]
downstream FOO: Thread[ForkJoinPool.commonPool-worker-1,5,main]
downstream BAZ: Thread[ForkJoinPool.commonPool-worker-2,5,main]
result: [BAR, BAZ]

Stream<String> s = Stream.of("foo", "bar", "baz").parallel();
System.out.println("started with "+(s.isParallel()? "parallel": "sequential")+" stream");
s = s.peek(x -> System.out.println("upstream "+x+": "+Thread.currentThread()));
s = myMethod(s.map(String::toUpperCase));
System.out.println("chaining another ops");
s = s.peek(x -> System.out.println("downstream "+x+": "+Thread.currentThread()));
s = s.filter(str -> str.startsWith("B"));
System.out.println("turning to sequential");
s = s.sequential();
System.out.println("commencing terminal operation");
List<String> l = s.collect(Collectors.toList());
System.out.println("result: "+l);
started with parallel stream
chaining another ops
turning to sequential
commencing terminal operation
MySpSupplier.get(): nailing down behavior
upstream foo: Thread[main,5,main]
downstream FOO: Thread[main,5,main]
upstream bar: Thread[main,5,main]
downstream BAR: Thread[main,5,main]
upstream baz: Thread[main,5,main]
downstream BAZ: Thread[main,5,main]
result: [BAR, BAZ]

【讨论】:

  • 非常有趣的解决方案@Holger。 CustomSpliterator 是什么意思?我认为是无用的,它可以被删除。天才的部分是当您将downStream 设置为StreamSupport(supplier, characteristics, isParallel) 的结果时。让我做一些额外的测试。
  • 好吧,你在代码注释“anything else that it's simpler and easy to demonstrate”中说,我使用了CustomSpliterator,它什么都不做,但可以用作任何更改的起点。请注意,我在它之前放置了评论“for simplification, this does not alter anything from the source stream”,这应该是为了澄清这一点。
  • 当然,虚拟分离器让我感到困惑(我的错)。我添加了一个单元测试来证明您的解决方案有效。我还注意到最后一个拆分器总是第一个被加载,因此传播做得很好。干得好@Holger!
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2012-04-16
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多