【发布时间】:2017-01-14 14:00:53
【问题描述】:
我想将 spark 和 spark 流输出到 kafka 一次。但正如医生所说
“输出操作(如 foreachRDD)具有至少一次语义,也就是说,在工作人员失败的情况下,转换后的数据可能会多次写入外部实体。”。
做事务性更新,spark 建议使用批处理时间(在 foreachRDD 中可用)和 RDD 的分区索引来创建标识符。此标识符唯一地标识流应用程序中的 blob 数据。代码如下:
dstream.foreachRDD { (rdd, time) =>
rdd.foreachPartition { partitionIterator =>
val partitionId = TaskContext.get.partitionId()
val **uniqueId** = generateUniqueId(time.milliseconds, partitionId)
// use this uniqueId to transactionally commit the data in partitionIterator
}
}
但是如何在kafka中使用uniqueId进行事务提交。
谢谢
【问题讨论】:
标签: scala apache-spark apache-kafka