【发布时间】:2017-03-16 02:11:35
【问题描述】:
我正在尝试从 kafka 服务器创建一个 dStream,然后对该流进行一些转换。如果流为空(if(!rdd.partitions.isEmpty)),我已经包含了一个捕获;但是,即使没有事件发布到 kafka 主题,也永远不会到达 else 语句。
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
stream.foreachRDD { rdd =>
if(!rdd.partitions.isEmpty) {
val message = rdd.map((x$2) => x$2._2).collect().toList.map(parser)
val val = message(0)
} else println("empty stream...")
ssc.start()
ssc.awaitTermination()
}
在使用KafkaUtils.createDirectStream 而不是createStream 时,我应该使用替代语句来检查流是否为空?
【问题讨论】:
标签: scala apache-kafka spark-streaming kafka-consumer-api dstream