【问题标题】:How to convert RDD[CassandraRow] to DataFrame?如何将 RDD[CassandraRow] 转换为 DataFrame?
【发布时间】:2017-10-30 09:55:24
【问题描述】:

目前这是我将 Cassandrarow RDD 转换为数据框的方式:

val ssc = new StreamingContext(sc, Seconds(15))

val dstream = new ConstantInputDStream(ssc, ssc.cassandraTable("db", "table").select("createdon"))

import sqlContext.implicits._

dstream.foreachRDD{ rdd =>
    val dataframeJobs = rdd.map(w => (w.dataAsString)).map(_.split(":")).map(x =>(x(1))).map(_.split(" ")).map(x =>(x(1))).toDF("ondate")
}

如您所见,我首先将 cassandraRow rdd 转换为字符串,然后映射到我想要的格式。我发现这种方法变得复杂,因为当 rdd 包含多个列而不是示例中所示的一个(createdon)时。

还有其他简单的方法可以将 cassandraRow RDD 转换为数据帧吗?

我的 build.sbt 如下:

scalaVersion := "2.11.8"

libraryDependencies ++= Seq(
  "com.datastax.spark" %% "spark-cassandra-connector" % "2.0.1",
  "org.apache.spark" %% "spark-core" % "2.0.2" % "provided",
  "org.apache.spark" %% "spark-sql" % "2.0.2",
  "org.apache.spark" %% "spark-streaming" % "2.0.2"
)

【问题讨论】:

  • 你可以在一张地图中完成,而不是像dstream.map(stream => stream.toString.split(":")(0).split(" ")(1)).toDF("ondate")那样嵌套地图
  • 你的用例是什么?您想在每个流式传输间隔加载 cassandra 表吗?以后你会用它做什么?
  • 是的,我想在每个流式传输间隔加载 cassandra 表。我正在将火花计算结果写入另一个表。

标签: scala apache-spark apache-spark-sql spark-streaming spark-cassandra-connector


【解决方案1】:

引用SparkContextFunctions的scaladoc(去掉隐含的参数):

cassandraTable[T](keyspace: String, table: String): CassandraTableScanRDD[T] 以 CassandraRDD 形式返回 Cassandra 表的视图。通过导入 com.datastax.spark.connector._

可以在 SparkContext 上使用此方法

根据传递给 cassandraTable 的类型参数,每一行都转换为以下之一:

  • 一个 CassandraRow 对象(默认,如果没有给出类型)
  • 包含与 CassandraRDD#select 选择的列顺序相同的列值的元组
  • 用户定义类的对象,由适当的 ColumnMapper 填充

所以,我建议使用以下方法:

ssc.cassandraTable[String]("db", "table").select("createdon")

根据文档,这应该为您提供访问 createdon 的最简单方法。


我还想知道为什么您不使用 spark-cassandra-connector 支持的 DataFrame,如 Datasets 中所述。这样你的代码可能会稍微简单一些。

您可以尝试用 Spark SQL 的 Structured Streaming 替换 Spark Streaming(几乎正式过时):

Structured Streaming 是基于 Spark SQL 引擎构建的可扩展和容错流处理引擎。您可以像表达对静态数据的批处理计算一样表达您的流计算。 Spark SQL 引擎将负责以增量和持续的方式运行它,并随着流数据的不断到达更新最终结果。

我不确定 Cassandra Spark 连接器是否支持它。

【讨论】:

    【解决方案2】:

    我想出了一种替代方法,可以有效地处理任意数量的列:

    rdd.keyBy(row => (row.getString("createdon"))).map(x => x._1).toDF("ondate")

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-01-24
      • 2018-03-30
      • 1970-01-01
      • 2019-09-09
      • 2016-05-09
      • 2015-12-08
      相关资源
      最近更新 更多