【发布时间】:2016-07-07 20:53:32
【问题描述】:
我不断地将数据从外部源写入 cassandra。
现在,我正在使用 Spark Streaming 通过以下代码从 cassandra 连续读取这些数据:
val ssc = new StreamingContext(sc, Seconds(5))
val cassandraRDD = ssc.cassandraTable("keyspace2", "feeds")
val dstream = new ConstantInputDStream(ssc, cassandraRDD)
dstream.foreachRDD { rdd =>
println("\n"+rdd.count())
}
ssc.start()
ssc.awaitTermination()
sc.stop()
但是,下面这行:
val cassandraRDD = ssc.cassandraTable("keyspace2", "feeds")
每次都从 cassandra 获取整个表数据。现在只是保存到表中的最新数据。
我想要做的是让火花流只读取最新数据,即在上次读取后添加的数据。
我怎样才能做到这一点?我尝试用谷歌搜索,但得到的文档很少。
我正在使用spark 1.4.1、scala 2.10.4 和cassandra 2.1.12。
谢谢!
编辑:
建议的重复问题(由我提出)不是重复的,因为它谈到了连接 spark 流和 cassandra,而这个问题是关于仅流式传输最新数据。顺便说一句,使用我提供的代码可以从 cassandra 流式传输。但是,它每次都需要整个表,而不仅仅是最新数据。
【问题讨论】:
-
目前无法从 Cassandra 流式传输。看到这个:stackoverflow.com/questions/34993290/…
-
哈哈。我才注意到是你问了这个问题。恐怕答案还没有改变。
-
那个问题(我问的)不是重复的,因为它谈到了连接 spark 流和 cassandra,而这个问题是关于只流式传输最新数据。顺便说一句,使用我提供的代码可以从 cassandra 流式传输。但是,它每次都需要整个表,而不仅仅是最新数据。
-
我认为您描述的问题没有开箱即用的解决方案,因为它需要 Cassandra 不维护的时间序列方式的附加信息。我认为您可以使用受“上次处理时间”而不是“cassandraTable”限制的 CQL 查询。但是你应该正确地改变你的数据结构:academy.datastax.com/demos/…
标签: scala apache-spark cassandra spark-streaming bigdata