【发布时间】:2017-11-23 23:27:20
【问题描述】:
这个问题是State management not serializable 的后续问题。
我想封装状态管理逻辑。
以下代表我现在所处的位置:
class StateManager(
stream: DStream[(String, String)],
updateStateFunction: (String, Option[String], State[String]) => Option[(String, String)]
) {
lazy val myState = stream.mapWithState(stateSpec).map(_.get)
lazy val stateSpec = StateSpec.function(updateStateFunction)
}
object StateManager {
def apply(
_dStream: DStream[(String, String)],
_updateState: (String, Option[String], State[String]) => Option[(String, String)]
) =
new StateManager(dStream, updateState)
}
这工作正常,但只允许处理DStream[(String,String)],这是迈向通用状态管理的第一步,适合欢迎任何DStream:从DStream[(Int,String)] 到DStream[(String,myCustomClass)]。
myState 必须是一个值函数才能工作 (serialization)。
但我面临一个问题,因为type parameters 不适用于 scala 中的函数对象。
user6910411 通过使用带有封闭方法 (Type-parameterize a DStream) 的 ClassTags 给了我一个提示,但反过来它仍然是一种方法。
有人知道如何克服这些困难吗?
上下文:
火花 1.6
火花图:
object Consumer_Orchestrator {
def main(args: Array[String]) = {
//setup configurations
val streamingContext = StreamingEnvironment(/*configurations*/)
val kafkaStream = streamingContext.stream()
val updateStateFunction: (String, Option[String], State[String]) => Option[(String, String)] = (key, value, state) => {/*some code*/}
val initialState = emptyRDD
val stateManager = StateManager(kafkaStream, updateState)
val state: DStream[(String, String)] = stateManager.myState
state.foreachRDD(_.foreach(println))
myStreamingContext.start()
myStreamingContext.awaitTermination()
}
}
StreamingEnvironment 类创建Streaming:
class StreamingEnvironment(sparkConf: SparkConf, kafkaConf: KafkaConf) {
val sparkContext = spark.SparkContext.getOrCreate(sparkConf)
lazy val streamingContext = new StreamingContext(sparkContext, Seconds(30))
mStreamingContext.checkpoint(/*directory checkpoint*/)
mStreamingContext.remember(Minutes(1))
def stream() = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](streamingContext, myKafkaConf.mBrokers, myKafkaConf.mTopics)
def stop() = sparkContext.stop()
}
object StreamingEnvironment {
def apply(kafkaConf: KafkaConf) = {
val sparkConf = new SparkConf
new StreamingEnvironment(sparkConf, kafkaConf)
}
}
【问题讨论】:
标签: scala apache-spark streaming state type-parameter