【发布时间】:2018-04-12 20:42:10
【问题描述】:
我目前正在编写一个 Spark 流应用程序,它从 Kafka 读取数据并尝试在应用一些转换之前对其进行解码。
目前的代码结构如下:
val stream = KafkaUtils.createDirectStream[String, String](...)
.map(record => decode(record.value())
.filter(...)
.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
...
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}
失败的解码和过滤发生在DStream上,偏移量管理在foreachRDD内部完成,也就是说我只会提交成功的记录。
要提交失败的记录,我可以移动 foreachRDD 循环中的所有内容:
val stream = KafkaUtils.createDirectStream[String, String](...)
.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
...
// Decoding and filtering here
...
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}
但是,我想知道是否还有其他方法可以提交失败的记录。也许不提交失败的记录是可以接受的?
【问题讨论】:
标签: scala apache-kafka spark-streaming