【问题标题】:Using Kafka to communicate between long running Spark jobs使用 Kafka 在长时间运行的 Spark 作业之间进行通信
【发布时间】:2016-08-16 01:49:35
【问题描述】:

我是 Apache Spark 的新手,需要同时在我的 Spark 集群上运行多个长时间运行的进程(作业)。通常,这些单独的进程(每个进程都是自己的工作)需要相互通信。暂时,我正在考虑使用 Kafka 作为这些进程之间的代理。所以高级别的工作对工作的沟通看起来像:

  1. 作业 #1 完成一些工作并将消息发布到 Kafka 主题
  2. Job #2 被设置为同一个 Kafka 主题的流式接收器(使用 StreamingContext),一旦消息发布到该主题,Job #2 就会使用它
  3. 作业 #2 现在可以根据它使用的消息做一些工作

据我所知,流式上下文正在阻塞在 Spark Driver 节点上运行的侦听器。这意味着一旦我像这样启动流式消费者:

def createKafkaStream(ssc: StreamingContext,
        kafkaTopics: String, brokers: String): DStream[(String, 
        String)] = {
    // some configs here
    KafkaUtils.createDirectStream[String, String, StringDecoder,
        StringDecoder](ssc, props, topicsSet)
}

def consumerHandler(): StreamingContext = {
    val ssc = new StreamingContext(sc, Seconds(10))

    createKafkaStream(ssc, "someTopic", "my-kafka-ip:9092").foreachRDD(rdd => {
        rdd.collect().foreach { msg =>
            // Now do some work as soon as we receive a messsage from the topic
        }
    })

    ssc
}

StreamingContext.getActive.foreach {
    _.stop(stopSparkContext = false)
}

val ssc = StreamingContext.getActiveOrCreate(consumerHandler)
ssc.start()
ssc.awaitTermination()

...现在有两个含义:

  1. 驱动程序现在正在阻塞并监听来自 Kafka 的工作;和
  2. 收到工作(消息)后,会将它们发送到任何可用的工作节点,以便实际执行

首先,如果我上面所说的任何内容不正确或具有误导性,请先纠正我!假设我或多或少是正确的,那么我只是想知道根据我的标准是否有更可扩展或性能更高的方法来实现这一点。同样,我有两个长期运行的作业(作业 #1 和作业 #2)在我的 Spark 节点上运行,其中一个需要能够将工作“发送给”另一个。有什么想法吗?

【问题讨论】:

  • 顺便说一句 - 在 foreachRDD 中使用 rdd.collect 将导致它们将整个数据集发送回驱动程序。你绝对不想那样。
  • 感谢@Yuval (+1),有什么更好/更有效的方式来访问正在使用的单个消息?这不是我的意图,我只是 API 的新手,所以请随时更新我的​​代码!
  • 你可以使用rdd.foreach

标签: scala apache-spark apache-kafka spark-streaming long-running-processes


【解决方案1】:

据我所知,流式上下文正在阻塞监听器 在 Spark Driver 节点上运行。

StreamingContext(单数)不是阻塞侦听器。它的工作是为您的流式作业创建执行图。

当您开始从 Kafka 读取数据时,您指定要每 10 秒获取一次新记录。从现在开始会发生什么取决于您为 Kafka 使用的 Kafka 抽象,是通过 KafkaUtils.createStream 的 Receiver 方法,还是通过 KafkaUtils.createDirectStream 的 Receiver-less 方法。

一般来说,在这两种方法中,数据都从 Kafka 中消耗,然后分派给每个 Spark 工作线程以并行处理。

那么我只是想知道是否有更具可扩展性或性能的 实现这一目标的方法

这种方法具有高度可扩展性。使用无接收器方法时,每个 Kafka 分区都映射到给定 RDD 中的 Spark 分区。您可以通过增加 Kafka 中的分区数量或重新分区 Spark 中的数据(使用 DStream.repartition)来增加并行度。我建议测试此设置以确定它是否符合您的性能要求。

【讨论】:

  • 感谢@Yuval (+1),如果你不介意的话,还有几个后续问题! (1) 您能否首先确认,为了针对 Spark 上的 Kafka 主题设置“竞争消费者”,我需要每个集群有 1 个消费者?接收器和无接收器配置都是这样吗? (2) 关于何时使用接收器与无接收器方法的一般准则是什么?
  • (3) 当您说“开始从 Kafka 读取时,您指定要每 10 秒获取一次新记录...”,这是在哪里配置的?可以设置为 10 秒以外的时间吗?最后,(4) 使用接收方方法时,Kafka 分区映射到什么?再次非常感谢!
  • @smeeb 1) 您所说的“竞争消费者”是什么意思? 2)我一般建议使用直接流式方法,它是在 Spark 1.3.0 中引入的,有很多优点。我建议阅读它。 3)这里配置:val ssc = new StreamingContext(sc, Seconds(10))。 4) 在基于接收器的方法中,没有分区映射。如果你想从 Kafka 并发读入,你必须连接多个消费者,这意味着你必须执行多个 KafkaUtils.createStream 调用并将它们联合起来。
  • 再次感谢@Yuval(再次+1)! “竞争消费者”是指多个消费者线程都在监听和消费来自同一个 Kafka 主题的消息。所以我上面的第一个问题实际上是想澄清我对 Spark 上的流式消费者的理解,即:每个并发消费者必须在自己的 Spark 集群上运行(对还是错?)。或者我可以从consumerHandler 内部多次调用KafkaUtils.createDStream,每次调用都会为我设置一个不同的“竞争”消费者线程(在同一个集群上)。
  • 如果您使用直接流方法,则两者都不需要。 Spark 将在工作人员之间划分 Kafka 偏移量,并且他们可以同时从 Kafka 中读取。直接流方法中的基数是 kafka 分区和 spark RDD 分区之间的 1:1。读取主题的并发度取决于可以从 Kafka 读取偏移量的 spark worker 的数量。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2016-07-31
  • 1970-01-01
  • 2023-02-04
  • 1970-01-01
  • 1970-01-01
  • 2010-10-16
  • 1970-01-01
相关资源
最近更新 更多