【问题标题】:What is the scope of the state for Spark Streaming's mapWithState?Spark Streaming 的 mapWithState 的状态范围是什么?
【发布时间】:2018-07-24 21:37:24
【问题描述】:

使用 Spark Streaming,我可以创建一个 DStream[(K, V)],我可以在其上使用 mapWithState 在流处理期间保持某些状态。 map 函数是这样设置的:

val mapFun =
  (key: K, maybeValue: Option[V], state: State[S]) => {
  // Do stuff
}

然后我可以使用:

val mappedStreamWithState = stream.mapWithState(StateSpec.function(mapFun))

我现在的问题是:state 的范围是什么?是key还是partition?

假设流来自具有 3 个分区但可以有 300 个键的 Kafka 主题。据我了解,流中的每个 RDD 都有 3 个分区,每个分区大约有 100 个键。那么是 3 种状态(每个分区一个)还是 300 个状态(每个键一个)?

【问题讨论】:

    标签: scala apache-spark spark-streaming


    【解决方案1】:

    我正在回答我自己的问题,因此如果其他人有相同的问题,我可以将其标记为已接受。

    tl;dr:状态的范围是分区,当你依赖key时有更多的注意事项。

    我对@9​​87654321@ 进行了相当多的调查,结果如下:

    • mapWithState 仅适用于 DStream[(A, B)]。到目前为止一切顺利。
    • 使用mapWithState 生成的传入流由键隐式分区。从我目前发现的情况来看,分区数取决于可用的工作任务数。例如,在具有 8 个线程的 CPU 上以 local[*] 模式运行它会产生 8 个分区。
    • 状态的范围是分区,所以如果你依赖于键,你应该添加一些映射到状态,例如,使用Map[TKey, TStateForTheKey]
    • stream.repartition(1).mapWithState(...) 这样的操作是徒劳的
    • 如果您需要确保两个键在一个分区中,即,如果您想从测量数据通道中计算一些值,则必须为它们分配一个唯一的键,这样可以确保它们正常运行进入同一个分区

    所以,假设您在 DStream[(String, Int)] 中有这样的数据:

    foo,1
    bar,8
    foo,23
    quux,423
    bletch,42
    bar,5
    

    并且您需要确保 foobar 一起处理并且 quuxbletch 是,您需要执行以下操作:

    stream.
      flatMap {
        case (k, v) if Seq("foo", "bar").contains(k)     => Some("foobar" -> (k, v))
        case (k, v) if Seq("quux", "bletch").contains(k) => Some("quuxbletch" -> (k, v))
        case _                                           => None
      }.
      mapWithState(StateSpec.function(myFunc))
    

    而且你的映射函数必须是这样的:

    val myFunc = (key: String, maybeRecord: Option[(String, Int)], state: State[Something]) => {
      // Do something with the record
    }
    

    【讨论】:

      猜你喜欢
      • 2016-12-24
      • 2016-07-02
      • 1970-01-01
      • 2011-05-12
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-10-12
      • 2017-01-04
      相关资源
      最近更新 更多