回答我自己的问题 -
我在 Spark Kinesis 集成方面取得了一些成功,关键是 unionStreams.foreachRDD。
有 2 个版本的 foreachRDD 可用
- unionStreams.foreachRDD
- unionStreams.foreachRDD ((rdd: RDD[Array[Byte]], time: Time)
由于某种原因,第一个无法让我得到结果,但更改为第二个可以让我得到预期的结果。尚待探究原因。
在下面添加代码sn-p以供参考。
还可以考虑更改此设置。这对我也有帮助-
"org.apache.spark" % "spark-streaming-kinesis-asl_2.10" % "1.6.0", // Doesnt work
"org.apache.spark" % "spark-streaming-kinesis-asl_2.10" % "1.4.1", // Works
希望它可以帮助某人:)
感谢大家的帮助。
val kinesisStreams = (0 until numStreams).map {
count =>
val stream = KinesisUtils.createStream(
ssc,
consumerName,
streamName,
endpointUrl,
regionName,
InitialPositionInStream.TRIM_HORIZON,
kinesisCheckpointInterval,
StorageLevel.MEMORY_AND_DISK_2
)
stream
}
val unionStreams = ssc.union(kinesisStreams)
println(s"========================")
println(s"Num of streams: ${numStreams}")
println(s"========================")
/*unionStreams.foreachRDD{ // Doesn't Work !!
rdd =>
println(rdd.count)
println("rdd isempty:" + rdd.isEmpty)
}*/
unionStreams.foreachRDD ((rdd: RDD[Array[Byte]], time: Time) => { // Works, Yeah !!
println(rdd.count)
println("rdd isempty:" + rdd.isEmpty)
}
)
ssc.start()
ssc.awaitTermination()