【问题标题】:Flink datastream keyby using composite keyFlink 数据流 keyby 使用复合键
【发布时间】:2019-05-26 19:28:53
【问题描述】:

我的问题与How to support multiple KeyBy in Flink 非常相似,只是那个问题是针对 Java 的,我需要 Scala 中的答案。我在 IntelliJ 中复制粘贴了提供的解决方案,它自动将复制粘贴的 sn-p 转换为 Scala,然后我对其进行编辑以适合我的代码。我仍然遇到编译错误(甚至在编译 IntelliJ 能够检测到代码问题之前)。基本上,提供给 keyBy 的参数(keySelector 的 getKey 函数的返回值)与任何重载版本的 keyBy 函数所期望的参数都不匹配。

查找了许多返回复合键的 KeySelector 的 scala 代码示例,但没有找到。

import org.apache.flink.api.java.functions.KeySelector
import org.apache.flink.api.java.tuple.Tuple2
import org.myorg.aarna.AAPerMinData
val aa_stats_keyed_stream = aa_stats_stream_w_timestamps.keyBy(new 
    KeySelector[AAPerMinData, Tuple2[String, String]]() {
    @throws[Exception]
    override def getKey(value: AAPerMinData): Tuple2[String, String] = 
    Tuple2.of(value.field1, value.field2)  
})

我在编译代码时收到以下错误:

Error:(213, 64) overloaded method value keyBy with alternatives:
[K](fun: org.myorg.aarna.AAPerMinData => K)(implicit evidence $2:org.apache.flink.api.common.typeinfo.TypeInformation[K])org.apache.flink.streaming.api.scala.KeyedStream[org.myorg.aarna.AAPerMinData,K] <and>
(firstField: String,otherFields: 
String*)org.apache.flink.streaming.api.scala.KeyedStream[org.myorg.aarna.AAPerMinData,org.apache.flink.api.java.tuple.Tuple] <and>
(fields: Int*)org.apache.flink.streaming.api.scala.KeyedStream[org.myorg.aarna.AAPerMinData,org.apache.flink.api.java.tuple.Tuple]
cannot be applied to (org.apache.flink.api.java.functions.KeySelector[org.myorg.aarna.AAPerMinData,org.apache.flink.api.java.tuple.Tuple2[String,String]])
val aa_stats_keyed_stream = aa_stats_stream_w_timestamps.keyBy(new KeySelector[AAPerMinData, Tuple2[String, String]]() {

我不确定导致此错误的语法中缺少什么。任何帮助是极大的赞赏。解决此问题后的下一步是基于复合键进行基于 TumblingWindow 的汇总。

更新 1(2018 年 12 月 29 日): 更改代码以使用 KeySelector 格式使用简单的 String 类型字段作为键(我知道这可以以更简单的方式完成,我这样做只是为了让基本的 KeySelector 工作)。

  import org.apache.flink.api.java.functions.KeySelector
  import org.myorg.aarna.AAPerMinData
  val aa_stats_keyed_stream = aa_stats_stream_w_timestamps.keyBy(new KeySelector[AAPerMinData, String]() {
    @throws[Exception]
    override def getKey(value: AAPerMinData): String = value.set1.sEntId
  })

这是我得到的错误的屏幕截图(即 IntelliJ 在鼠标悬停时显示此错误)。

更新 2(2018 年 12 月 29 日)

这有效(对于单键情况)

val aa_stats_keyed_stream = aa_stats_stream_w_timestamps.keyBy[String] 
(_.set1.sEntId)

这不起作用(对于复合键情况)

val aa_stats_keyed_stream = aa_stats_stream_w_timestamps.keyBy([String, String)](_.set1.sEntId, _.set1.field2)

更新 3(2018 年 12 月 29 日) 尝试了以下,无法让它工作。查看错误截图。

val aa_stats_keyed_stream = aa_stats_stream_w_timestamps.keyBy[(String, String)]((_.set1.sEntId, _.set1.field2))

更新 4(2018 年 12 月 30 日) 现已解决,请参阅已接受的答案。对于任何可能感兴趣的人,这是最终的工作代码,包括使用复合键进行聚合:

// Composite key
val aa_stats_keyed_stream = aa_stats_stream_w_timestamps.keyBy[(String, String)](x => (x.set1.sEntId, x.set1.field2))

// Tumbling window
val aggr_keyed_stream = aa_stats_keyed_stream.window(TumblingEventTimeWindows.of(Time.seconds(60)))

// all set for window based aggregation of a "composite keyed" stream
val aggr_stream = aggr_keyed_stream.apply { (key: (String, String), window: TimeWindow, events: Iterable[AAPerMinData],
                                                 out: Collector[AAPerMinDataAggr]) =>
      out.collect(AAPerMinDataAggrWrapper(key._1 + key._2, // composite
        key._1, key._2, // also needed individual pieces
        window,
        events,
        stream_deferred_live_duration_in_seconds*1000).getAAPerMinDataAggr)}
// print the "mapped" stream for debugging purposes
aggr_stream.print()

【问题讨论】:

    标签: scala apache-flink flink-streaming


    【解决方案1】:

    首先,虽然没有必要,但请继续使用 Scala 元组。总体而言,它会让事情变得更容易,除非您出于某种原因必须与 Java 元组进行互操作。

    然后,不要使用 org.apache.flink.api.java.functions.KeySelector。你想从 org.apache.flink.streaming.api.scala.DataStream 中使用这个 keyBy:

    /**
     * Groups the elements of a DataStream by the given K key to
     * be used with grouped operators like grouped reduce or grouped aggregations.
     */
    def keyBy[K: TypeInformation](fun: T => K): KeyedStream[T, K] = {
    
      val cleanFun = clean(fun)
      val keyType: TypeInformation[K] = implicitly[TypeInformation[K]]
    
      val keyExtractor = new KeySelector[T, K] with ResultTypeQueryable[K] {
        def getKey(in: T) = cleanFun(in)
        override def getProducedType: TypeInformation[K] = keyType
      }
      asScalaStream(new JavaKeyedStream(stream, keyExtractor, keyType))
    }
    

    换句话说,只需传递一个将您的流元素转换为键值的函数(通常,Flink 的 scala API 试图是惯用的)。所以像这样的东西应该可以完成这项工作:

    aa_stats_stream_w_timestamps.keyBy[String](value => value.set1.sEntId)
    

    更新:

    对于复合键情况,使用

    aa_stats_stream_w_timestamps.keyBy[(String, String)](x => (x.set1.sEntId, x.set1.field2))
    

    【讨论】:

    • 那个特定的导入已经出现在文件的开头(对不起,我没有在文件的开头发布 t 导入声明)。为了使代码更简单,我现在尝试将 KeySelector 用于非复合键(单个字符串类型字段),仍然得到相同的错误。
    • 请看上面的更新1,我已经发布了简化代码和错误截图。
    • 顺便说一句,您可以在 github 上的 Flink 训练练习 repo 中找到更多 scala 示例:github.com/dataArtisans/flink-training-exercises/tree/master/…
    • 谢谢,我假设您的意思是:val aa_stats_keyed_stream = aa_stats_stream_w_timestamps.keyBy[String](.set1.sEntId),我如何使它适用于两个字段(复合密钥情况)。尝试这样做,但它没有编译: val aa_stats_keyed_stream = aa_stats_stream_w_timestamps.keyBy[(String, String)](.set1.sEntId, _.set1.field2)
    • 仍然缺少一些东西,无法让它工作。请参阅更新 3 中的屏幕截图。
    猜你喜欢
    • 2022-12-02
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-05-19
    • 1970-01-01
    • 2020-12-24
    • 2019-12-13
    相关资源
    最近更新 更多