【发布时间】:2017-04-18 16:02:03
【问题描述】:
在 Spark 中,我从 Kafka 创建了一个批处理时间为 5 秒的流。在此期间可能会收到许多消息,我想单独处理每条消息,但按照我目前的逻辑,似乎只处理每批的第一条消息。
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, params, topics)
val messages = stream.map((x$2) => x$2._2)
messages.foreachRDD { rdd =>
if(!rdd.isEmpty) {
val message = rdd.map(parse)
println(message.collect())
}
}
parse 函数只是将 Json 消息中的相关字段提取到一个元组中。
我可以深入了解分区并以这种方式单独处理每条消息:
messages.foreachRDD { rdd =>
if(!rdd.isEmpty) {
rdd.foreachPartition { partition =>
partition.foreach{msg =>
val message = parse(msg)
println(message)
}
}
}
}
但我确信有一种方法可以保持在 RDD 级别。在第一个示例中我做错了什么?
我正在使用 spark 2.0.0、scala 2.11.8 和 spark streaming kafka 0.8。
【问题讨论】:
标签: scala apache-spark apache-kafka spark-streaming