【发布时间】:2018-09-18 12:53:18
【问题描述】:
我有以下代码:-
val rss = sc.cassandraTable("db", "table").select("id", "date", "gpsdt").where("id=? and date=? and gpsdt>? and gpsdt<?", entry(0), entry(1), entry(2) , entry(3))
rss.foreach { records =>
{
println("Cassandra Row " + records.toString())
val gpsdttime = records.get[String]("gpsdt")
val justLess = rss.filter(row => row.get[String]("gpsdt") < gpsdttime).sortBy(row => row.get[String]("gpsdt"), false).take(1)
}
}
所以,我的想法是根据一些 where 子句从 Cassandra 中选择一组 RDD,并遍历每一行并找到其各自的前一行以重新计算一些值并更新当前行。但这给出了一个错误:-
org.apache.spark.SparkException: This RDD lacks a SparkContext. It could happen in the following cases:
(1) RDD transformations and actions are NOT invoked by the driver, but inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.
(2) When a Spark Streaming job recovers from checkpoint, this exception will be hit if a reference to an RDD not defined by the streaming job is used in DStream operations. For more information, See SPARK-13758.
at org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$sc(RDD.scala:89)
请提出建议,谢谢,
【问题讨论】:
-
你不能在另一个 rdd 中拥有 rdd。
-
你可以做 rdd.filter(filterFunc).map(mapFunc).take(1)
-
@Knight71 - 我知道我不能在另一个 RDD 中使用 RDD ,但是你知道我的情况是我想迭代每个 RDD 值并且该特定行将再次必须在该 RDD 中搜索。以及您提供的示例 rdd.filter .. 是否适用于迭代?
标签: scala apache-spark apache-spark-sql spark-cassandra-connector