【发布时间】:2015-02-13 02:54:11
【问题描述】:
我正在尝试开发一个应用程序,它从 kafka 服务器获取四个不同的主题,并对每个主题采取特定的操作。
我创建了一个接收 DStream 的类,并有一个可以转换 DStream 的方法。
例如处理程序类:
class StreamHandler(stream:DStream[String]) {
val stream:DStream[String] = stream
def doActions():DStream[String] = {
//Do smth. to DStream
}
}
现在,假设我从主类中为我想要的每个处理程序类调用 doActions(),它会随着每个到达的 DStream 重复还是只重复一次?
val topicHandler1 = new StreamHandler(KafkaUtils.createStream(ssc, zkQuorum, "myGroup", Map("topic1"->1)).map(_._2)
val topicHandler2 = new OtherStreamHandler(KafkaUtils.createStream(ssc, zkQuorum, "myGroup", Map("topic2"->1)).map(_._2)
topicHandler1.doActions()
topicHandler2 .doActions()
ssc.start()
有更好的方法吗?
【问题讨论】:
-
我猜你想说
topicHandler1.doActions()。 -
是的,我打错了
标签: apache-spark apache-kafka spark-streaming