【问题标题】:reading from several topics从几个主题阅读
【发布时间】:2015-02-13 02:54:11
【问题描述】:

我正在尝试开发一个应用程序,它从 kafka 服务器获取四个不同的主题,并对每个主题采取特定的操作。

我创建了一个接收 DStream 的类,并有一个可以转换 DStream 的方法。

例如处理程序类:

class StreamHandler(stream:DStream[String]) {
  val stream:DStream[String] = stream

  def doActions():DStream[String] =  {
    //Do smth. to DStream
  }
}

现在,假设我从主类中为我想要的每个处理程序类调用 doActions(),它会随着每个到达的 DStream 重复还是只重复一次?

val topicHandler1 = new StreamHandler(KafkaUtils.createStream(ssc, zkQuorum, "myGroup", Map("topic1"->1)).map(_._2)
val topicHandler2 = new OtherStreamHandler(KafkaUtils.createStream(ssc, zkQuorum, "myGroup", Map("topic2"->1)).map(_._2)

topicHandler1.doActions()
topicHandler2 .doActions()

ssc.start()

有更好的方法吗?

【问题讨论】:

  • 我猜你想说topicHandler1.doActions()
  • 是的,我打错了

标签: apache-spark apache-kafka spark-streaming


【解决方案1】:

StreamHandler 上声明的转换将应用于 DStream 的每批。当前的代码很不完整,无法给你一个确定的答案。在 DStream 转换管道中,您将需要一个 action that materializes the DStream,否则不会发生任何事情。

关于方法,一个接受 DStream 并对其应用转换的函数就足够了,并且易于测试:

val pipeline:DStream[Data] => () = dstream => 
    dstream.map(...).filter(...).print()

就目前而言,类结构的购买力似乎并不高。

【讨论】:

  • 谢谢。我已经缩短了代码片段以使其更清晰。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2017-03-14
  • 2018-07-12
  • 1970-01-01
  • 1970-01-01
  • 2021-03-02
  • 1970-01-01
相关资源
最近更新 更多