【问题标题】:Stream the most recent data in cassandra with spark streaming使用 spark 流式传输 cassandra 中的最新数据
【发布时间】:2016-07-07 20:53:32
【问题描述】:

我不断地将数据从外部源写入 cassandra。

现在,我正在使用 Spark Streaming 通过以下代码从 cassandra 连续读取这些数据:

val ssc = new StreamingContext(sc, Seconds(5))

val cassandraRDD = ssc.cassandraTable("keyspace2", "feeds")


val dstream = new ConstantInputDStream(ssc, cassandraRDD)


dstream.foreachRDD { rdd =>
 println("\n"+rdd.count())
}

ssc.start()
ssc.awaitTermination()
sc.stop()

但是,下面这行:

val cassandraRDD = ssc.cassandraTable("keyspace2", "feeds")

每次都从 cassandra 获取整个表数据。现在只是保存到表中的最新数据。

我想要做的是让火花流只读取最新数据,即在上次读取后添加的数据。

我怎样才能做到这一点?我尝试用谷歌搜索,但得到的文档很少。

我正在使用spark 1.4.1scala 2.10.4cassandra 2.1.12

谢谢!

编辑:

建议的重复问题(由我提出)不是重复的,因为它谈到了连接 spark 流和 cassandra,而这个问题是关于仅流式传输最新数据。顺便说一句,使用我提供的代码可以从 cassandra 流式传输。但是,它每次都需要整个表,而不仅仅是最新数据。

【问题讨论】:

  • 目前无法从 Cassandra 流式传输。看到这个:stackoverflow.com/questions/34993290/…
  • 哈哈。我才注意到是你问了这个问题。恐怕答案还没有改变。
  • 那个问题(我问的)不是重复的,因为它谈到了连接 spark 流和 cassandra,而这个问题是关于只流式传输最新数据。顺便说一句,使用我提供的代码可以从 cassandra 流式传输。但是,它每次都需要整个表,而不仅仅是最新数据。
  • 我认为您描述的问题没有开箱即用的解决方案,因为它需要 Cassandra 不维护的时间序列方式的附加信息。我认为您可以使用受“上次处理时间”而不是“cassandraTable”限制的 CQL 查询。但是你应该正确地改变你的数据结构:academy.datastax.com/demos/…

标签: scala apache-spark cassandra spark-streaming bigdata


【解决方案1】:

将在 Cassandra 上进行一些低级工作,允许通知外部系统(索引器、Spark 流等)传入 Cassandra 的新突变,请阅读:https://issues.apache.org/jira/browse/CASSANDRA-8844

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2021-11-15
    • 2020-08-11
    • 2021-06-21
    • 2016-05-03
    • 2022-08-12
    • 1970-01-01
    • 1970-01-01
    • 2018-05-27
    相关资源
    最近更新 更多