我通过使用两种不同的解决方案找到了可行的答案
标准是
- 我不想在看到“订单”之前收集整个“行”流
- 我不想看地图
- 我使用行流按顺序排序的事实(来自数据库)
第一个答案是这是一个FLATMAP问题。可以制作一个 flatmap 函数,将属于同一“订单”的所有“线”分组 - BINGO ..
但是,Flatmap function 有问题 - last 'line' 永远不会变成一个组,因为在 flatmap function 有机会发送最后一个元素之前,流将被关闭。这可以通过使用流 concat 添加 EOS 行来解决。
但是但是对于其他开发者来说很难解释。通过改变流读取下一个元素的方式并询问下一个元素是否存在,可以更好地解决缺少最后一个元素的解决方案。
解决方案必须更改 Stream,这样我们才能处理最后一个元素。
我的最终解决方案是制作自己的 Stream 并管理 Iterator
测试看起来像这样
public void grouping() {
Stream<String> cartesian = Stream.of("100-1", "100-3", "100-4",
"120-1", "120-2",
"133-1"); // ... millions of elements
Comparator<String> comp = new Comparator<String>() {
@Override
public int compare(String o1, String o2) {
return extractKey(o1).compareTo(extractKey(o2));
}
private String extractKey(String element) {
String str = element;
return str.substring(0, str.indexOf('-'));
}
};
final Stream<List<String>> grouped= GroupStream
.of(cartesian)
.chunkToStreamOfListByComparator(comp);
grouped.forEach(row -> System.out.println("Grouped " + row));
}
输出:
- 分组 [100-1, 100-3, 100-4]
- 分组 [120-1, 120-2]
- 分组 [133-1]
我创建了一个名为 GroupStream 的类。此类创建一个新流(其中元素在列表中分组)。在类中读取源流。
重的部分是:
- 让迭代器的 hasNext() 工作
- 使迭代器的 next() 工作
- 了解如何创建新流。
Java 类如下所示
package dk.otc.demo;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
/**
* @param <ELEMENT> The instance type in Stream.
*/
public class GroupStream<ELEMENT> {
private final Iterator<ELEMENT> sourceIterator;
/**
* @param source a stream of elements that contains identical information to
* group the elements into a list with this identical information.
*
* A precondition is that the stream is sorted
*/
public GroupStream(Stream<ELEMENT> source) {
sourceIterator = source.iterator();
}
/**
* @param comparator function that defines how to compare ELEMENT in
* order to get knowledge of when to continue adding a source ELEMENT to
* the current group and when to ship away the current group and start
* building an new group.
* @return a stream with the grouped elements
*/
public Stream<List<ELEMENT>> chunkToStreamOfListByComparator(Comparator<? super ELEMENT> comparator) {
final Iterator<List<ELEMENT>> chunkIterator = new Iterator<List<ELEMENT>>() {
/**
* Makes the iterator {@link #hasNext()} return a consistent value
*/
Boolean consistentHasNext = null;
List<ELEMENT> chunkUnderConstruction = initChunk();
ELEMENT firstInNext = null;
@Override
public boolean hasNext() {
if (consistentHasNext != null)
return consistentHasNext;
boolean more = sourceIterator.hasNext();
if (!more && chunkUnderConstruction.isEmpty()) {
return false;
}
boolean same = more;
while (same && more) {
ELEMENT value = sourceIterator.next();
same = same(value);
if (same) {
add(value);
} else {
firstInNext = value;
}
more = sourceIterator.hasNext();
}
consistentHasNext = (!chunkUnderConstruction.isEmpty()) || firstInNext != null;
return consistentHasNext;
}
@Override
public List<ELEMENT> next() {
try {
consistentHasNext = null;
return chunkUnderConstruction;
} finally {
chunkUnderConstruction = initChunk();
if (firstInNext != null) {
add(firstInNext);
firstInNext = null;
}
}
}
private List<ELEMENT> initChunk() {
return new ArrayList<>();
}
private void add(ELEMENT value) {
chunkUnderConstruction.add(value);
}
boolean same(ELEMENT element) {
final boolean res;
if (chunkUnderConstruction.isEmpty()) {
res = true;
} else {
res = comparator.compare(chunkUnderConstruction.get(0), element) == 0;
}
return res;
}
};
final Spliterator<List<ELEMENT>> split = Spliterators.spliterator(chunkIterator, -1,
(Spliterator.ORDERED // A collection like List (Not Map). FIFO is ordered
| Spliterator.NONNULL // No null will arrive
| Spliterator.IMMUTABLE // Stream is not mutated during execution of stream
));
return StreamSupport.stream(split, false);
}
public static <ELEMENT> GroupStream<ELEMENT> of(Stream<ELEMENT> source) {
return new GroupStream<>(source);
}
}