【问题标题】:How to use flink fold function in scala如何在scala中使用flink折叠功能
【发布时间】:2015-09-08 00:45:17
【问题描述】:

这是将 Flink fold 与 scala 匿名函数一起使用的无效尝试:

val myFoldFunction = (x: Double, t:(Double,String,String)) => x + t._1
env.readFileStream(...).
...
.groupBy(1)
.fold(0.0, myFoldFunction : Function2[Double, (Double,String,String), Double])

它编译得很好,但在执行时,我得到一个“类型擦除问题”(见下文)。在 Java 中这样做很好,但当然更冗长。我喜欢简洁明了的 lambda。我怎样才能在 scala 中做到这一点?

Caused by: org.apache.flink.api.common.functions.InvalidTypesException:
Type of TypeVariable 'R' in 'public org.apache.flink.streaming.api.scala.DataStream org.apache.flink.streaming.api.scala.DataStream.fold(java.lang.Object,scala.Function2,org.apache.flink.api.common.typeinfo.TypeInformation,scala.reflect.ClassTag)' could not be determined. 
This is most likely a type erasure problem. 
The type extraction currently supports types with generic variables only in cases where all variables in the return type can be deduced from the input type(s).

【问题讨论】:

    标签: scala apache-flink


    【解决方案1】:

    您遇到的问题是 Flink [1] 中的一个错误。问题源于 Flink 的TypeExtractor 以及 Scala DataStream API 在 Java 实现之上实现的方式。 TypeExtractor 无法为 Scala 类型生成 TypeInformation,因此返回 MissingTypeInformation。在创建 StreamFold 运算符后手动设置此缺少的类型信息。但是,StreamFold 运算符的实现方式不接受 MissingTypeInformation,因此在设置正确的类型信息之前失败。

    我已经打开了一个拉取请求 [2] 来解决这个问题。它应该在接下来的两天内合并。通过使用最新的 0.10 快照版本,您的问题应该得到解决。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2011-01-22
      • 2014-05-18
      • 2021-05-24
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-03-05
      相关资源
      最近更新 更多