【问题标题】:java stream chunk order-line into ordersjava 将大块订单行转换成订单
【发布时间】:2020-06-03 18:43:17
【问题描述】:

我有一个订单流

Stream<String> lines=Stream.of("100-1","100-2","100-3",
                               "120-1",
                               "333-1","333-2"); // .. millions of lines

现在我想将它们分块/映射/分组到一个流中

Stream<List<String>> orders=Stream.of(
               Arrays.asList("100-1","100-2","100-3"),
               Arrays.asList("120-1"),
               Arrays.asList("333-1","333-2")); // .. thousands of orders

现在我可以将流中的每个元素按顺序处理。 我想读取“订单”流中的每个元素作为订单的单位

我如何从行中流式传输 --> 订单?

  • 我不想在看到“订单”之前收集整个“行”流
  • 我不想看地图
  • 我不在乎订单“100”可能会再次出现 100.000 个元素 后来..因为那没有发生

--

  • 我已经阅读/尝试了很多关于“foo”的分组和超级有趣的文章。
  • 我需要把它做好。
  • 在示例中,为了简单起见,我将 OrderLine 显示为字符串“100-2”。它是带有订单号字段、订单行字段等带有 getter 的真实对象。

【问题讨论】:

标签: java java-stream grouping


【解决方案1】:

这样的事情怎么样。每个地图值都是一个列表。关键是破折号左边的值-

Map<String, List<String>> orders = Stream
       .of("100-1", "100-2", "100-3", "120-1", "333-1",
               "333-2")
       .collect(Collectors.groupingBy(order -> order
               .substring(0, order.indexOf("-"))));

orders.entrySet().forEach(System.out::println);

打印

100=[100-1, 100-2, 100-3]
333=[333-1, 333-2]
120=[120-1]

所以你可以这样做。

List<String> order100 = orders.get("100");
System.out.println(order100);

打印

[100-1, 100-2, 100-3]

如果您不想处理地图,而只想列出订单列表,则可以从地图中获取。

List<List<String>> lists = 
         orders.values().stream().collect(Collectors.toList());

【讨论】:

  • 关于生成 MAP 的问题是整个流“line”在流“order”流式传输之前被处理到末尾。这意味着该解决方案占用大量内存 - 而不是流式传输。
  • 如果你要将它们保存在数据结构中,它确实可能会占用大量内存。处理从文件中读取的数据或类似方法可能会有所帮助。同样使用更传统的循环方法并间歇性地将列表写入单独的文件,这些文件被命名以反映其内容可能会起作用,
【解决方案2】:

您可以根据输入创建地图,然后仅收集地图值

import scala.Tuple2;

import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class WordSpliter {

    public static void main(String[] args) {
        Stream<String> lines= Stream.of("100-1","100-2","100-3",
                "120-1",
                "333-1","333-2");

        Stream<List<String>> map = lines.map((String str) ->
                new Tuple2<>(str.substring(0, str.indexOf("-")), str)
        ).collect(Collectors.groupingBy(Tuple2::_1, Collectors.mapping(Tuple2::_2, Collectors.toList())))
                .values().stream();

        map.forEach(System.out::println);


    }
}

【讨论】:

  • 答案有效 - 但有一个问题 - 它会在提供订单之前遍历所有行
  • 这是真的,这是查找唯一前缀所必需的,但映射也将在执行程序上并行运行
【解决方案3】:

我通过使用两种不同的解决方案找到了可行的答案 标准是

  1. 我不想在看到“订单”之前收集整个“行”流
  2. 我不想看地图
  3. 我使用行流按顺序排序的事实(来自数据库)

第一个答案是这是一个FLATMAP问题。可以制作一个 flatmap 函数,将属于同一“订单”的所有“线”分组 - BINGO ..

但是,Flatmap function 有问题 - last 'line' 永远不会变成一个组,因为在 flatmap function 有机会发送最后一个元素之前,流将被关闭。这可以通过使用流 concat 添加 EOS 行来解决。

但是但是对于其他开发者来说很难解释。通过改变流读取下一个元素的方式并询问下一个元素是否存在,可以更好地解决缺少最后一个元素的解决方案。

解决方案必须更改 Stream,这样我们才能处理最后一个元素。

我的最终解决方案是制作自己的 Stream 并管理 Iterator

测试看起来像这样

  public void grouping() {

    Stream<String> cartesian = Stream.of("100-1", "100-3", "100-4",
        "120-1", "120-2",
        "133-1"); // ... millions of elements

    Comparator<String> comp = new Comparator<String>() {
      @Override
      public int compare(String o1, String o2) {
        return extractKey(o1).compareTo(extractKey(o2));
      }

      private String extractKey(String element) {
        String str = element;
        return str.substring(0, str.indexOf('-'));
      }

    };
    final Stream<List<String>> grouped= GroupStream
        .of(cartesian)
        .chunkToStreamOfListByComparator(comp);


    grouped.forEach(row -> System.out.println("Grouped " + row));

  }

输出:

  • 分组 [100-1, 100-3, 100-4]
  • 分组 [120-1, 120-2]
  • 分组 [133-1]

我创建了一个名为 GroupStream 的类。此类创建一个新流(其中元素在列表中分组)。在类中读取源流。

重的部分是:

  • 让迭代器的 hasNext() 工作
  • 使迭代器的 next() 工作
  • 了解如何创建新流。

Java 类如下所示

package dk.otc.demo;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

/**
 * @param <ELEMENT> The instance type in Stream.
 */
public class GroupStream<ELEMENT> {

  private final Iterator<ELEMENT> sourceIterator;

  /**
   * @param source a stream of elements that contains identical information to
   *               group the elements into a list with this identical information.
   *
   *               A precondition is that the stream is sorted
   */
  public GroupStream(Stream<ELEMENT> source) {
    sourceIterator = source.iterator();
  }

  /**
   * @param comparator function that defines how to compare ELEMENT in
   *                   order to get knowledge of when to continue adding a source ELEMENT to
   *                   the current group and when to ship away the current group and start
   *                   building an new group.
   * @return a stream with the grouped elements
   */
  public Stream<List<ELEMENT>> chunkToStreamOfListByComparator(Comparator<? super ELEMENT> comparator) {

    final Iterator<List<ELEMENT>> chunkIterator = new Iterator<List<ELEMENT>>() {

      /**
       * Makes the iterator {@link #hasNext()} return a consistent value
       */
      Boolean consistentHasNext = null;
      List<ELEMENT> chunkUnderConstruction = initChunk();
      ELEMENT firstInNext = null;

      @Override
      public boolean hasNext() {
        if (consistentHasNext != null)
          return consistentHasNext;
        boolean more = sourceIterator.hasNext();
        if (!more && chunkUnderConstruction.isEmpty()) {
          return false;
        }
        boolean same = more;
        while (same && more) {
          ELEMENT value = sourceIterator.next();
          same = same(value);
          if (same) {
            add(value);
          } else {
            firstInNext = value;
          }
          more = sourceIterator.hasNext();
        }
        consistentHasNext = (!chunkUnderConstruction.isEmpty()) || firstInNext != null;
        return consistentHasNext;
      }

      @Override
      public List<ELEMENT> next() {
        try {
          consistentHasNext = null;
          return chunkUnderConstruction;
        } finally {
          chunkUnderConstruction = initChunk();
          if (firstInNext != null) {
            add(firstInNext);
            firstInNext = null;
          }
        }
      }

      private List<ELEMENT> initChunk() {
        return new ArrayList<>();
      }

      private void add(ELEMENT value) {
        chunkUnderConstruction.add(value);
      }

      boolean same(ELEMENT element) {
        final boolean res;
        if (chunkUnderConstruction.isEmpty()) {
          res = true;
        } else {
          res = comparator.compare(chunkUnderConstruction.get(0), element) == 0;
        }
        return res;
      }
    };
    final Spliterator<List<ELEMENT>> split = Spliterators.spliterator(chunkIterator, -1,
        (Spliterator.ORDERED      // A collection like List (Not Map). FIFO is ordered
            | Spliterator.NONNULL // No null will arrive
            | Spliterator.IMMUTABLE // Stream is not mutated during execution of stream
        ));
    return StreamSupport.stream(split, false);
  }

  public static <ELEMENT> GroupStream<ELEMENT> of(Stream<ELEMENT> source) {
    return new GroupStream<>(source);
  }
}

【讨论】:

    猜你喜欢
    • 2019-07-13
    • 1970-01-01
    • 1970-01-01
    • 2022-01-04
    • 1970-01-01
    • 1970-01-01
    • 2016-04-17
    • 1970-01-01
    • 2019-05-04
    相关资源
    最近更新 更多