【问题标题】:In Flink, stream windowing does not seem to work?在 Flink 中,流窗口化似乎不起作用?
【发布时间】:2015-05-26 14:44:42
【问题描述】:

我尝试增强显示流使用的 Flink 示例。 我的目标是使用窗口功能(请参阅window 函数调用)。 我假设下面的代码输出流的最后 3 个数字的总和。 (由于 ubuntu 上的nc -lk 9999,该流已打开) 实际上,输出总结了所有输入的数字。切换到时间窗口产生相同的结果,即不产生窗口。

这是一个错误吗? (使用的版本:github上最新的master)

object SocketTextStreamWordCount {
  def main(args: Array[String]) {
    val hostName = args(0)
    val port = args(1).toInt
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    // Create streams for names and ages by mapping the inputs to the corresponding objects
    val text = env.socketTextStream(hostName, port)    
    val currentMap = text.flatMap { (x:String) => x.toLowerCase.split("\\W+") }
    .filter { (x:String) => x.nonEmpty }      
    .window(Count.of(3)).every(Time.of(1, TimeUnit.SECONDS))
    //  .window(Time.of(5, TimeUnit.SECONDS)).every(Time.of(1, TimeUnit.SECONDS))
      .map { (x:String) => ("not used; just to have a tuple for the sum", x.toInt) }

    val numberOfItems = currentMap.count
    numberOfItems print
    val counts = currentMap.sum( 1 )
    counts print

    env.execute("Scala SocketTextStreamWordCount Example")
  }
}

【问题讨论】:

    标签: apache-flink


    【解决方案1】:

    问题似乎是从WindowedDataStreamDataStream 的隐式转换。这种隐式转换在WindowedDataStream 上调用flatten()

    在您的情况下发生的情况是代码扩展为:

    val currentMap = text.flatMap { (x:String) => x.toLowerCase.split("\\W+") }
        .filter { (x:String) => x.nonEmpty }      
        .window(Count.of(3)).every(Time.of(1, TimeUnit.SECONDS))
        .flatten()   
        .map { (x:String) => ("not used; just to have a tuple for the sum",x.toInt) }    
    

    flatten() 的作用类似于集合上的flatMap()。它采用可以被视为集合集合 ([[a,b,c], [d,e,f]]) 的窗口流并将其转换为元素流:[a,b,c,d,e,f]

    这意味着您的计数实际上仅在已窗口化和“去窗口化”的原始流上运行。这看起来根本就没有被窗口化过。

    这是一个问题,我会马上解决这个问题。 (我是 Flink 提交者之一。)您可以在这里跟踪进度:https://issues.apache.org/jira/browse/FLINK-2096

    使用当前 API 的方法是这样的:

    val currentMap = text.flatMap { (x:String) => x.toLowerCase.split("\\W+") }
        .filter { (x:String) => x.nonEmpty }   
        .map { (x:String) => ("not used; just to have a tuple for the sum",x.toInt) }    
        .window(Count.of(3)).every(Time.of(1, TimeUnit.SECONDS))
    

    WindowedDataStream 有一个 sum() 方法,因此不会隐式插入 flatten() 调用。不幸的是,count()WindowedDataStream 上不可用,因此您必须手动将 1 字段添加到元组并计算这些。

    【讨论】:

    • 那么,对于当前的 API,对示例代码应用的更正是什么?
    猜你喜欢
    • 2018-01-10
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-05-30
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多