【发布时间】:2016-10-10 00:24:20
【问题描述】:
我正在尝试使用 Kafka DirectStream,处理每个分区的 RDD,并将处理后的值写入数据库。当我尝试执行 reduceByKey(每个分区,即没有随机播放)时,我收到以下错误。通常在驱动节点上,我们可以使用 sc.parallelize(Iterator) 来解决这个问题。但我想在火花流中解决它。
value reduceByKey is not a member of Iterator[((String, String), (Int, Int))]
有没有办法在分区内对 Iterator 执行转换?
myKafkaDS
.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
val commonIter = rdd.mapPartitionsWithIndex ( (i,iter) => {
val offset = offsetRanges(i)
val records = iter.filter(item => {
(some_filter_condition)
}).map(r1 => {
// Some processing
((field2, field2), (field3, field4))
})
val records.reduceByKey((a,b) => (a._1+b._1, a._2+b._2)) // Getting reduceByKey() is not a member of Iterator
// Code to write to DB
Iterator.empty // I just want to store the processed records in DB. So returning empty iterator
})
}
有没有更优雅的方法来做到这一点(为每个分区处理 kafka RDD 并将它们存储在数据库中)?
【问题讨论】:
-
您要保存哪个数据库?许多数据库都有可用的 spark db 连接器 API,使用它可以轻松地将 RDD 保存到数据库。
-
@Shankar 我担心的是不存储到数据库中。但是要处理属于同一 kafka 偏移量的 RDD 并存储它们(包括偏移量和数据),以便我可以跟踪处理后的偏移量。
标签: apache-spark apache-kafka spark-streaming