【问题标题】:Apache Flink - Access internal buffer of WindowedStream from another Stream's MapFunctionApache Flink - 从另一个 Stream 的 MapFunction 访问 WindowedStream 的内部缓冲区
【发布时间】:2018-04-21 17:44:26
【问题描述】:

我有一个基于 Apache Flink 的流应用程序,设置如下:

  • 数据源:每分钟生成一次数据。
  • 使用大小=100、滑动=1 的 CountWindow 的窗口化流(滑动计数窗口)。
  • ProcessWindowFunction 对 Window 中的数据应用一些计算(比如 F(x) )。
  • 使用输出流的数据接收器

这很好用。现在,我想让用户提供一个函数 G(x) 并将其应用于 Window 中的当前数据,并将输出实时发送给用户

我不是在询问如何应用任意函数 G(x) - 我正在使用动态脚本来做到这一点。我在问如何从另一个流的映射函数访问窗口中的缓冲数据。

一些代码需要澄清

DataStream<Foo> in  = .... // source data produced every minute
    in
       .keyBy(new MyKeySelector())
       .countWindow(100, 1)
       .process(new MyProcessFunction())
       .addSink(new MySinkFunction())

// The part above is working fine. Note that windowed stream created by countWindow() function above has to maintain internal buffer. Now the new requirement

DataStream<Function> userRequest  = .... // request function from user

userRequest.map(new MapFunction<Function, FunctionResult>(){
   public FunctionResult map(Function Gx) throws Exception {
         Iterable<Foo> windowedDataFromAbove = // HOW TO GET THIS???
         FunctionResult result = Gx.apply(windowedDataFromAbove);
         return result;

   }

})

【问题讨论】:

    标签: apache-flink flink-streaming


    【解决方案1】:

    连接两个流,然后使用CoProcessFunction。获取函数流的方法调用可以将它们应用到另一个方法调用的窗口中。

    如果你想广播 Functions,那么你要么需要使用 Flink 1.5(它支持连接键控和广播流),要么使用一些直升机特技来创建一个可以同时包含 Foo 和 Function 类型的流,适当复制函数(和密钥生成)以模拟广播。

    【讨论】:

    • 这是我的第一个想法。但 API 不支持与 WindowedStream ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/… 连接
    • 在 (Co)ProcessFunction 中,您可以通过控制计时器何时触发来进行自己的窗口化。
    • 无论如何,实现自己的窗口化会更有效率——通过这些滑动计数窗口,每个事件都被复制到 100 个窗口对象中。
    • 这是真的吗?为什么 Flink 会做 100 个副本?滑动计数窗口可以用固定大小的缓冲区来实现。
    【解决方案2】:

    假设 Fx 实时聚合传入的 foo 并且 Gx 处理一个窗口的 foo 值,您应该能够实现您想要的如下:

    DataStream<Function> userRequest  = .... // request function from user
    Iterator<Function> iter = DataStreamUtils.collect(userRequest);
    Function Gx = iter.next();
    
    DataStream<Foo> in  = .... // source data
     .keyBy(new MyKeySelector())
     .countWindow(100, 1)
     .fold(new ArrayList<>(), new MyFoldFunc(), new MyProcessorFunc(Gx))
     .addSink(new MySinkFunction())
    

    Fold 函数(传入数据一到达就对其进行操作)可以这样定义:

    private static class MyFoldFunc implements FoldFunction<foo, Tuple2<Integer, List<foo>>> {
        @Override
        public Tuple2<Integer, List<foo>> fold(Tuple2<Integer, List<foo>> acc, foo f) {
            acc.f0 = acc.f0 + 1; // if Fx is a simple aggregation (count)
            acc.f1.add(foo);
            return acc;
        }
    }
    

    处理器函数可以是这样的:

    public class MyProcessorFunc
        extends ProcessWindowFunction<Tuple2<Integer, List<foo>>, Tuple2<Integer, FunctionResult>, String, TimeWindow> {
    
        public MyProcessorFunc(Function Gx) {
            super();
            this.Gx = Gx;
        }
    
        @Override
        public void process(String key, Context context,
                            Iterable<Tuple2<Integer, List<foo>> accIt,
                            Collector<Tuple2<Integer, FunctionResult>> out) {
            Tuple2<Integer, List<foo> acc = accIt.iterator().next();
            out.collect(new Tuple2<Integer, FunctionResult>(
                acc.f0, // your Fx aggregation
                Gx.apply(acc.f1), // your Gx results
            ));
        }
    }
    

    请注意,默认情况下 fold\reduce 函数不会在内部缓冲元素。我们在这里使用 fold 来计算动态指标并创建一个窗口项列表。

    如果您有兴趣在滚动窗口(非滑动窗口)上应用 Gx,您可以在管道中使用滚动窗口。要计算滑动计数,您可以有另一个仅计算滑动计数的管道分支(不应用 Gx)。这样,您不必为每个窗口保留 100 个列表。

    注意:您可能需要添加以下依赖项才能使用 DataStreamUtils:

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-contrib</artifactId>
        <version>0.10.2</version>
    </dependency>
    

    【讨论】:

    • Gx 来自其自己的 DataStream 中的最终用户。在上面的代码示例中,Gx 被声明为 MyProcessorFunc 构造函数的输入参数,但调用者代码没有显示 Gx 是如何传递给构造函数的。
    • @Sebastian 假设您在程序开始时始终可以使用来自用户的函数,您可以将其转换为迭代器并将其作为输入提供给 MyProcessorFunc,如更新的答案所示。您还可以在内存中收集第一个流输出(列表)并等到用户请求到来,但这与 Flink 的工作方式不符,因为您将阻塞第一个流计算(并将其输出保留在内存中)直到用户请求到来!
    • 用户 (Gx) 的功能在程序开始时不可用。它由最终用户通过 REST API 发送 - 转换为流并发送到 Flink 进行处理。但是,有一些内置函数 (Fx) 正在对窗口数据进行处理。这里的问题是如何利用已经在内存中的窗口数据(因为它被用来计算 Fx)来计算 Gx
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多