【问题标题】:Apache Flink Streaming window WordCountApache Flink 流式处理窗口 WordCount
【发布时间】:2015-10-30 23:21:36
【问题描述】:

我有以下代码来计算来自 socketTextStream 的单词。需要累积字数和时间窗口字数。该程序存在一个问题,即 cumulateCounts 始终与窗口计数相同。为什么会出现这个问题?根据窗口计数计算累积计数的正确方法是什么?

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final HashMap<String, Integer> cumulateCounts = new HashMap<String, Integer>();

final DataStream<Tuple2<String, Integer>> counts = env
            .socketTextStream("localhost", 9999)
            .flatMap(new Splitter())
            .window(Time.of(5, TimeUnit.SECONDS))
            .groupBy(0).sum(1)
            .flatten();

counts.print();

counts.addSink(new SinkFunction<Tuple2<String, Integer>>() {
    @Override
    public void invoke(Tuple2<String, Integer> value) throws Exception {
        String word = value.f0;
        Integer delta_count = value.f1;
        Integer count = cumulateCounts.get(word);
        if (count == null)
            count = 0;
        count = count + delta_count;
        cumulateCounts.put(word, count);
        System.out.println("(" + word + "," + count.toString() + ")");
    }
});

【问题讨论】:

    标签: java apache-flink flink-streaming


    【解决方案1】:

    您应该首先分组,并将窗口应用于键控数据流(您的代码适用于 Flink 0.9.1,但 Flink 0.10.0 中的新 API 对此有严格要求):

    final DataStream<Tuple2<String, Integer>> counts = env
            .socketTextStream("localhost", 9999)
            .flatMap(new Splitter())
            .groupBy(0)
            .window(Time.of(5, TimeUnit.SECONDS)).sum(1)
            .flatten();
    

    如果在非键控数据流上应用窗口,那么在单台机器上将只有一个单线程窗口操作符(即没有并行性)来在整个流上构建窗口(在 Flink 0.9.1 中,这个全局窗口可以通过groupBy() 拆分为子窗口——但是,在 Flink 0.10.0 中这将不再起作用)。要计算单词,您需要为每个不同的键值构建一个窗口,即,您首先获取每个键值的子流(通过groupBy())并在每个子流上应用一个窗口运算符(因此,您可以有一个每个子流都有自己的窗口运算符实例,允许并行执行)。

    对于全局(累积)计数,您可以简单地应用 groupBy().sum() 构造。首先,流被分成子流(每个键值一个)。其次,您计算流的总和。因为流是加窗的,计算(累积)和更新每个传入元组的总和(更详细地说,总和的初始结果值为零,并且每个元组的结果更新为result += tuple.value)。每次调用 sum 后,都会发出新的当前结果。

    在您的代码中,您不应该使用您的特殊接收器函数,而是执行以下操作:

    counts.groupBy(0).sum(1).print();
    

    【讨论】:

    • 感谢您的回复。您能否提供更多解释为什么我的解决方案失败了?这些解决方案在flink集群中的数据流有什么区别?
    • 一个问题:如何在数据流中只得到求和运算的最终结果而不“滚动”求和?
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-02-01
    • 1970-01-01
    • 2018-03-08
    • 1970-01-01
    • 2018-12-13
    相关资源
    最近更新 更多