【问题标题】:Sort RDD in Spark before publishing it to Kafka?在将 RDD 发布到 Kafka 之前对 Spark 中的 RDD 进行排序?
【发布时间】:2018-04-17 22:41:55
【问题描述】:

在我的代码中,我首先订阅一个 Kafka 流,处理每个 RDD 以创建我的类 People 的一个实例,然后,我想将结果集 (Dataset[People]) 发布到 Kafka 的特定主题。需要注意的是,并非从 Kafka 收到的每条传入消息都映射到 People 的实例。此外,人员的实例应该按照与从 Kafka 接收的顺序完全相同的顺序发送到 Kafka。

但是,我不确定排序是否真的有必要,或者People 的实例在执行器上运行相应的代码时是否保持相同的顺序(我可以直接将我的数据集发布到 Kafka)。据我了解,排序是必要的,因为foreachRDD里面的代码可以在集群的不同节点上执行。这是正确的吗?

这是我的代码:

val myStream = KafkaUtils.createDirectStream[K, V](streamingContext, PreferConsistent, Subscribe[K, V](topics, consumerConfig))

def process(record: (RDD[ConsumerRecord[String, String]], Time)): Unit = record match {
case (rdd, time) if !rdd.isEmpty =>
    // More Code...
    // In the end, I have: Dataset[People]
case _ =>
}

myStream.foreachRDD((x, y) => process((x, y))) // Do I have to replace this call with map, sort the RDD and then publish it to Kafka?

【问题讨论】:

    标签: scala apache-spark apache-kafka


    【解决方案1】:

    此外,人员的实例应该按照与从 Kafka 接收的顺序完全相同的顺序发送到 Kafka。

    除非你有一个单独的分区(然后你不会使用 Spark,对吗?)接收数据的顺序不是确定性的,同样,发送数据的顺序也不是确定性的。排序在这里没有任何区别。

    如果您需要一个非常具体的处理顺序(如果您使用数据密集型应用程序,这通常是设计错误),您需要一个顺序应用程序,或者比 Spark 具有更精细控制的系统。

    【讨论】:

    • 感谢您的帮助。也许我错过了重点,我根本不应该使用 Spark。我只是想从 Kafka 订阅一个主题,处理数据,然后将其发布回 Kafka 的另一个主题。这些主题的消费者将处理从 Kafka 流式传输的消息,并且在我的情况下顺序很重要。
    猜你喜欢
    • 2021-06-04
    • 2014-07-13
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-02-19
    • 2015-08-23
    • 1970-01-01
    • 2015-08-23
    相关资源
    最近更新 更多