【问题标题】:Filter spark CassandaRow each row of a RDD [duplicate]过滤火花CassandaRow RDD的每一行[重复]
【发布时间】:2018-09-18 12:53:18
【问题描述】:

我有以下代码:-

 val rss = sc.cassandraTable("db", "table").select("id", "date", "gpsdt").where("id=? and date=? and gpsdt>? and gpsdt<?", entry(0), entry(1), entry(2) , entry(3))

    rss.foreach { records =>
      {
        println("Cassandra Row " + records.toString())
        val gpsdttime = records.get[String]("gpsdt")
        val justLess = rss.filter(row => row.get[String]("gpsdt") < gpsdttime).sortBy(row => row.get[String]("gpsdt"), false).take(1)
      }
    }

所以,我的想法是根据一些 where 子句从 Cassandra 中选择一组 RDD,并遍历每一行并找到其各自的前一行以重新计算一些值并更新当前行。但这给出了一个错误:-

org.apache.spark.SparkException: This RDD lacks a SparkContext. It could happen in the following cases: 
(1) RDD transformations and actions are NOT invoked by the driver, but inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.
(2) When a Spark Streaming job recovers from checkpoint, this exception will be hit if a reference to an RDD not defined by the streaming job is used in DStream operations. For more information, See SPARK-13758.
at org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$sc(RDD.scala:89) 

请提出建议,谢谢,

【问题讨论】:

  • 你不能在另一个 rdd 中拥有 rdd。
  • 你可以做 rdd.filter(filterFunc).map(mapFunc).take(1)
  • @Knight71 - 我知道我不能在另一个 RDD 中使用 RDD ,但是你知道我的情况是我想迭代每个 RDD 值并且该特定行将再次必须在该 RDD 中搜索。以及您提供的示例 rdd.filter .. 是否适用于迭代?

标签: scala apache-spark apache-spark-sql spark-cassandra-connector


【解决方案1】:

Exception的意思是SparkContext在driver中初始化,而foreach中的func在executor中运行,所以当你运行job时,它会抛出 以下异常:

org.apache.spark.SparkException: This RDD lacks a SparkContext.

您的情况的原因是 (1) RDD 转换和操作不是由驱动程序调用的,而是在其他转换内部;例如,rdd1.map(x => rdd2.values.count() * x) 无效,因为值转换和计数操作无法在 rdd1.map 转换内部执行。有关详细信息,请参阅 SPARK-5063。

【讨论】:

  • 嗨..凯文感谢您的回复。但是,如果您可以为我提供一些解决方案,说明我可以如何以及在代码中更改哪些内容以解决它,那将会更有帮助。谢谢,
  • 如果您想使用前一行并更新当前行,我认为您可以通过 spark 作业将前一行保存在 storage(hdfs,mysql) 中,然后由另一个 spark 作业使用。
  • 不完全是 kevin ,这会产生开销。我试图通过索引 RDD 并获取结果来解决它。让我们看看情况如何。同时欢迎提出更多建议。谢谢,
猜你喜欢
  • 2015-07-10
  • 2018-03-07
  • 1970-01-01
  • 2021-04-04
  • 2017-02-28
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多