【发布时间】:2017-05-06 00:29:40
【问题描述】:
我正在使用 Scala 版本 2.10.5 Cassandra 3.0 和 Spark 1.6。我想将数据插入 cassandra,所以我尝试了基本示例
scala> val collection = sc.parallelize(Seq(("cat", 30), ("fox", 40)))
scala> collection.saveToCassandra("test", "words", SomeColumns("word", "count"))
哪个可以将数据插入 Cassandra。所以我有一个 csv 文件,我想通过匹配模式将其插入 Cassandra 表
val person = sc.textFile("hdfs://localhost:9000/user/hduser/person")
import org.apache.spark.sql._
val schema = StructType(Array(StructField("firstName",StringType,true),StructField("lastName",StringType,true),StructField("age",IntegerType,true)))
val rowRDD = person.map(_.split(",")).map(p => org.apache.spark.sql.Row(p(0),p(1),p(2).toInt))
val personSchemaRDD = sqlContext.applySchema(rowRDD, schema)
personSchemaRDD.saveToCassandra
当我使用 SaveToCassndra 时,我发现 saveToCassandra 不是 personSchemaRDD 的一部分。所以被教导以不同的方式尝试
df.write.format("org.apache.spark.sql.cassandra").options(Map( "table" -> "words_copy", "keyspace" -> "test")).save()
但是无法在 ip:port 上连接到 cassandra。任何人都可以告诉我最好的方法。我需要定期将文件中的数据保存到 cassandra。
【问题讨论】:
标签: scala apache-spark spark-cassandra-connector