【发布时间】:2018-03-11 17:58:51
【问题描述】:
下面是代码: 斯卡拉版本:2.11。 火花版本:2.0.2.6 Cassandra 版本:cqlsh 5.0.1 |卡桑德拉 3.11.0.1855 | DSE 5.1.3 | CQL 规范 3.4.4 |原生协议 v4
我正在尝试从 CSV 读取并写入 Cassandra 表。我是 Scala 和 Spark 的新手。请纠正我做错的地方
import org.apache.spark.sql.SparkSession
import org.apache.log4j.{Level, Logger}
import com.datastax
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import com.datastax.spark.connector._
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{DoubleType, StringType, StructField, StructType}
import org.apache.spark.sql._
import com.datastax.spark.connector.UDTValue
import com.datastax.spark.connector.mapper.DefaultColumnMapper
object dataframeset {
def main(args: Array[String]): Unit = {
// Cassandra Part
val conf = new SparkConf().setAppName("Sample1").setMaster("local[*]")
val sc = new SparkContext(conf)
sc.setLogLevel("ERROR")
val rdd1 = sc.cassandraTable("tdata", "map")
rdd1.collect().foreach(println)
// Scala Read CSV Part
Logger.getLogger("org").setLevel(Level.ERROR)
Logger.getLogger("akka").setLevel(Level.ERROR)
val spark1 = org.apache.spark.sql.SparkSession
.builder()
.master("local")
.appName("Spark SQL basic example")
.getOrCreate()
val df = spark1.read.format("csv")
.option("header","true")
.option("inferschema", "true")
.load("/Users/tom/Desktop/del2.csv")
import spark1.implicits._
df.printSchema()
val dfprev = df.select(col = "Year","Measure").filter("Category = 'Prevention'" )
// dfprev.collect().foreach(println)
val a = dfprev.select("YEAR")
val b = dfprev.select("Measure")
val collection = sc.parallelize(Seq(a,b))
collection.saveToCassandra("tdata", "map", SomeColumns("sno", "name"))
spark1.stop()
}
}
错误:
Exception in thread "main" java.lang.IllegalArgumentException: Multiple constructors with the same number of parameters not allowed.
卡桑德拉表
cqlsh:tdata> 描述图
创建表 tdata.map ( sno int 主键, 名称文本;
我知道我遗漏了一些东西,尤其是试图一次性将整个数据帧写入 Cassandra。不是我也不知道需要做什么。
谢谢 汤姆
【问题讨论】:
-
为什么不用 cqlsh 复制命令? docs.datastax.com/en/cql/3.1/cql/cql_reference/copy_r.html
-
那是因为我不需要整个 CSV,我需要在加载它们之前应用过滤器和转换
标签: scala csv apache-spark intellij-idea cassandra