【发布时间】:2016-08-16 01:49:35
【问题描述】:
我是 Apache Spark 的新手,需要同时在我的 Spark 集群上运行多个长时间运行的进程(作业)。通常,这些单独的进程(每个进程都是自己的工作)需要相互通信。暂时,我正在考虑使用 Kafka 作为这些进程之间的代理。所以高级别的工作对工作的沟通看起来像:
- 作业 #1 完成一些工作并将消息发布到 Kafka 主题
- Job #2 被设置为同一个 Kafka 主题的流式接收器(使用
StreamingContext),一旦消息发布到该主题,Job #2 就会使用它 - 作业 #2 现在可以根据它使用的消息做一些工作
据我所知,流式上下文正在阻塞在 Spark Driver 节点上运行的侦听器。这意味着一旦我像这样启动流式消费者:
def createKafkaStream(ssc: StreamingContext,
kafkaTopics: String, brokers: String): DStream[(String,
String)] = {
// some configs here
KafkaUtils.createDirectStream[String, String, StringDecoder,
StringDecoder](ssc, props, topicsSet)
}
def consumerHandler(): StreamingContext = {
val ssc = new StreamingContext(sc, Seconds(10))
createKafkaStream(ssc, "someTopic", "my-kafka-ip:9092").foreachRDD(rdd => {
rdd.collect().foreach { msg =>
// Now do some work as soon as we receive a messsage from the topic
}
})
ssc
}
StreamingContext.getActive.foreach {
_.stop(stopSparkContext = false)
}
val ssc = StreamingContext.getActiveOrCreate(consumerHandler)
ssc.start()
ssc.awaitTermination()
...现在有两个含义:
- 驱动程序现在正在阻塞并监听来自 Kafka 的工作;和
- 收到工作(消息)后,会将它们发送到任何可用的工作节点,以便实际执行
首先,如果我上面所说的任何内容不正确或具有误导性,请先纠正我!假设我或多或少是正确的,那么我只是想知道根据我的标准是否有更可扩展或性能更高的方法来实现这一点。同样,我有两个长期运行的作业(作业 #1 和作业 #2)在我的 Spark 节点上运行,其中一个需要能够将工作“发送给”另一个。有什么想法吗?
【问题讨论】:
-
顺便说一句 - 在 foreachRDD 中使用
rdd.collect将导致它们将整个数据集发送回驱动程序。你绝对不想那样。 -
感谢@Yuval (+1),有什么更好/更有效的方式来访问正在使用的单个消息?这不是我的意图,我只是 API 的新手,所以请随时更新我的代码!
-
你可以使用
rdd.foreach。
标签: scala apache-spark apache-kafka spark-streaming long-running-processes