【发布时间】:2018-04-17 22:41:55
【问题描述】:
在我的代码中,我首先订阅一个 Kafka 流,处理每个 RDD 以创建我的类 People 的一个实例,然后,我想将结果集 (Dataset[People]) 发布到 Kafka 的特定主题。需要注意的是,并非从 Kafka 收到的每条传入消息都映射到 People 的实例。此外,人员的实例应该按照与从 Kafka 接收的顺序完全相同的顺序发送到 Kafka。
但是,我不确定排序是否真的有必要,或者People 的实例在执行器上运行相应的代码时是否保持相同的顺序(我可以直接将我的数据集发布到 Kafka)。据我了解,排序是必要的,因为foreachRDD里面的代码可以在集群的不同节点上执行。这是正确的吗?
这是我的代码:
val myStream = KafkaUtils.createDirectStream[K, V](streamingContext, PreferConsistent, Subscribe[K, V](topics, consumerConfig))
def process(record: (RDD[ConsumerRecord[String, String]], Time)): Unit = record match {
case (rdd, time) if !rdd.isEmpty =>
// More Code...
// In the end, I have: Dataset[People]
case _ =>
}
myStream.foreachRDD((x, y) => process((x, y))) // Do I have to replace this call with map, sort the RDD and then publish it to Kafka?
【问题讨论】:
标签: scala apache-spark apache-kafka