【发布时间】:2017-10-30 09:55:24
【问题描述】:
目前这是我将 Cassandrarow RDD 转换为数据框的方式:
val ssc = new StreamingContext(sc, Seconds(15))
val dstream = new ConstantInputDStream(ssc, ssc.cassandraTable("db", "table").select("createdon"))
import sqlContext.implicits._
dstream.foreachRDD{ rdd =>
val dataframeJobs = rdd.map(w => (w.dataAsString)).map(_.split(":")).map(x =>(x(1))).map(_.split(" ")).map(x =>(x(1))).toDF("ondate")
}
如您所见,我首先将 cassandraRow rdd 转换为字符串,然后映射到我想要的格式。我发现这种方法变得复杂,因为当 rdd 包含多个列而不是示例中所示的一个(createdon)时。
还有其他简单的方法可以将 cassandraRow RDD 转换为数据帧吗?
我的 build.sbt 如下:
scalaVersion := "2.11.8"
libraryDependencies ++= Seq(
"com.datastax.spark" %% "spark-cassandra-connector" % "2.0.1",
"org.apache.spark" %% "spark-core" % "2.0.2" % "provided",
"org.apache.spark" %% "spark-sql" % "2.0.2",
"org.apache.spark" %% "spark-streaming" % "2.0.2"
)
【问题讨论】:
-
你可以在一张地图中完成,而不是像
dstream.map(stream => stream.toString.split(":")(0).split(" ")(1)).toDF("ondate")那样嵌套地图 -
你的用例是什么?您想在每个流式传输间隔加载 cassandra 表吗?以后你会用它做什么?
-
是的,我想在每个流式传输间隔加载 cassandra 表。我正在将火花计算结果写入另一个表。
标签: scala apache-spark apache-spark-sql spark-streaming spark-cassandra-connector