【问题标题】:Generic state management通用状态管理
【发布时间】:2017-11-23 23:27:20
【问题描述】:

这个问题是State management not serializable 的后续问题。


我想封装状态管理逻辑。

以下代表我现在所处的位置:

class StateManager(
  stream: DStream[(String, String)],
  updateStateFunction: (String, Option[String], State[String]) => Option[(String, String)]
) {
  lazy val myState = stream.mapWithState(stateSpec).map(_.get)
  lazy val stateSpec = StateSpec.function(updateStateFunction)
}

object StateManager {
  def apply(
    _dStream: DStream[(String, String)],
    _updateState: (String, Option[String], State[String]) => Option[(String, String)]
  ) =
    new StateManager(dStream, updateState)
}

这工作正常,但只允许处理DStream[(String,String)],这是迈向通用状态管理的第一步,适合欢迎任何DStream:从DStream[(Int,String)]DStream[(String,myCustomClass)]

myState 必须是一个值函数才能工作 (serialization)。

但我面临一个问题,因为type parameters 不适用于 scala 中的函数对象。

user6910411 通过使用带有封闭方法 (Type-parameterize a DStream) 的 ClassTags 给了我一个提示,但反过来它仍然是一种方法。

有人知道如何克服这些困难吗?


上下文:

火花 1.6

火花图:

object Consumer_Orchestrator {
    def main(args: Array[String]) = {
        //setup configurations

        val streamingContext = StreamingEnvironment(/*configurations*/)

        val kafkaStream = streamingContext.stream()

        val updateStateFunction: (String, Option[String], State[String]) => Option[(String, String)] = (key, value, state) => {/*some code*/}
        val initialState = emptyRDD

        val stateManager = StateManager(kafkaStream, updateState)
        val state: DStream[(String, String)] = stateManager.myState

        state.foreachRDD(_.foreach(println))

        myStreamingContext.start()
        myStreamingContext.awaitTermination()
    }
}

StreamingEnvironment 类创建Streaming

class StreamingEnvironment(sparkConf: SparkConf, kafkaConf: KafkaConf) {
    val sparkContext = spark.SparkContext.getOrCreate(sparkConf)
    lazy val streamingContext = new StreamingContext(sparkContext, Seconds(30))

    mStreamingContext.checkpoint(/*directory checkpoint*/)
    mStreamingContext.remember(Minutes(1))

    def stream() = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](streamingContext, myKafkaConf.mBrokers, myKafkaConf.mTopics)
    def stop() = sparkContext.stop()
}

object StreamingEnvironment {
    def apply(kafkaConf: KafkaConf) = {
    val sparkConf = new SparkConf

    new StreamingEnvironment(sparkConf, kafkaConf)
    }
}

【问题讨论】:

    标签: scala apache-spark streaming state type-parameter


    【解决方案1】:

    你在这里:

    • App.scala:

      import org.apache.spark.{SparkContext, SparkConf}
      import org.apache.spark.streaming._
      import org.apache.spark.streaming.dstream.ConstantInputDStream
      import statemanager._
      
      object App {
        def main(args: Array[String]): Unit = {
          val sc = new SparkContext("local[*]", "generic", new SparkConf())
          val ssc = new StreamingContext(sc, Seconds(10))
          ssc.checkpoint("/tmp/chk")
      
          StateManager(
            new ConstantInputDStream(ssc, sc.parallelize(Seq(("a", 1), ("b",2)))),
            (_: String, _: Option[Int], _: State[Int]) =>  Option(1)
          ).myState.print
          ssc.start()
          ssc.awaitTermination()
        }
      }
      
    • StateManage.scala:

      package statemanager
      
      import scala.reflect.ClassTag
      import org.apache.spark.streaming.{State, StateSpec}
      import org.apache.spark.streaming.dstream.DStream
      
      class StateManager[T : ClassTag, U : ClassTag, V : ClassTag, W : ClassTag](
        stream: DStream[(T, U)],
        updateStateFunction: (T, Option[U], State[V]) => Option[W]
      ) {
        lazy val myState = stream.mapWithState(stateSpec).map(_.get)
        lazy val stateSpec = StateSpec.function(updateStateFunction)
      }
      
      object StateManager {
        def apply[T : ClassTag, U : ClassTag, V : ClassTag, W : ClassTag](
          _dStream: DStream[(T, U)],
          _updateState: (T, Option[U], State[V]) => Option[W]
        ) =
          new StateManager(_dStream, _updateState)
      }
      
    • build.sbt:

      scalaVersion := "2.11.8"
      
      val sparkVersion = "2.1.0"
      
      libraryDependencies ++= Seq(
        "org.apache.spark" %% "spark-core" % sparkVersion,
        "org.apache.spark" %% "spark-streaming" % sparkVersion
      )
      
    • 目录结构:

      ├── App.scala
      ├── build.sbt
      └── StateManage.scala
      
    • 示例执行:

      sbt run
      ...
      -------------------------------------------
       Time: 1483701790000 ms
       -------------------------------------------
      1
      1
      ...
      

    如您所见,这里没有魔法。如果您引入通用参数,您需要在相同的上下文中使用ClassTags

    【讨论】:

      猜你喜欢
      • 2013-08-15
      • 1970-01-01
      • 2020-11-26
      • 2020-07-13
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-08-18
      • 1970-01-01
      相关资源
      最近更新 更多