【问题标题】:Spark Scala Cassandra CSV insert into cassandraSpark Scala Cassandra CSV 插入到 cassandra
【发布时间】:2018-03-11 17:58:51
【问题描述】:

下面是代码: 斯卡拉版本:2.11。 火花版本:2.0.2.6 Cassandra 版本:cqlsh 5.0.1 |卡桑德拉 3.11.0.1855 | DSE 5.1.3 | CQL 规范 3.4.4 |原生协议 v4

我正在尝试从 CSV 读取并写入 Cassandra 表。我是 Scala 和 Spark 的新手。请纠正我做错的地方

import org.apache.spark.sql.SparkSession
import org.apache.log4j.{Level, Logger}
import com.datastax

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import com.datastax.spark.connector._
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{DoubleType, StringType, StructField, StructType}
import org.apache.spark.sql._
import com.datastax.spark.connector.UDTValue
import com.datastax.spark.connector.mapper.DefaultColumnMapper


object dataframeset {

  def main(args: Array[String]): Unit = {

    // Cassandra Part

    val conf = new SparkConf().setAppName("Sample1").setMaster("local[*]")
    val sc = new SparkContext(conf)
    sc.setLogLevel("ERROR")
    val rdd1 = sc.cassandraTable("tdata", "map")

    rdd1.collect().foreach(println)

    // Scala Read CSV Part
    Logger.getLogger("org").setLevel(Level.ERROR)
    Logger.getLogger("akka").setLevel(Level.ERROR)
    val spark1 = org.apache.spark.sql.SparkSession
      .builder()
      .master("local")
      .appName("Spark SQL basic example")
      .getOrCreate()

    val df = spark1.read.format("csv")
      .option("header","true")
      .option("inferschema", "true")
      .load("/Users/tom/Desktop/del2.csv")
    import spark1.implicits._
      df.printSchema()
      val dfprev = df.select(col = "Year","Measure").filter("Category = 'Prevention'" )

//      dfprev.collect().foreach(println)
      val a = dfprev.select("YEAR")
      val b = dfprev.select("Measure")

      val collection = sc.parallelize(Seq(a,b))
    collection.saveToCassandra("tdata", "map", SomeColumns("sno", "name"))

    spark1.stop()

  }

}

错误:

Exception in thread "main" java.lang.IllegalArgumentException: Multiple constructors with the same number of parameters not allowed.

卡桑德拉表

cqlsh:tdata> 描述图

创建表 tdata.map ( sno int 主键, 名称文本;

我知道我遗漏了一些东西,尤其是试图一次性将整个数据帧写入 Cassandra。不是我也不知道需要做什么。

谢谢 汤姆

【问题讨论】:

标签: scala csv apache-spark intellij-idea cassandra


【解决方案1】:

您可以直接将数据帧(spark 2.x 中的数据集[Row])写入 cassandra。

如果在 spark conf 中启用了身份验证,则必须定义 cassandra 主机、用户名和密码才能使用类似的东西连接到 cassandra

val conf = new SparkConf(true)
    .set("spark.cassandra.connection.host", "CASSANDRA_HOST")
    .set("spark.cassandra.auth.username", "CASSANDRA_USERNAME")            
    .set("spark.cassandra.auth.password", "CASSANDRA_PASSWORD")

val spark1 = org.apache.spark.sql.SparkSession
      .builder()
      .master("local")
      .config("spark.cassandra.connection.host", "CASSANDRA_HOST")
      .config("spark.cassandra.auth.username", "CASSANDRA_USERNAME")            
      .config("spark.cassandra.auth.password", "CASSANDRA_PASSWORD")
      .appName("Spark SQL basic example")
      .getOrCreate()

val dfprev = df.filter("Category = 'Prevention'" ).select(col("Year").as("yearAdded"),col("Measure").as("Recording"))

dfprev .write
  .format("org.apache.spark.sql.cassandra")
  .options(Map("table" -> "map", "keyspace" -> "tdata"))
  .save()

Dataframe in spark-cassandra-connector

【讨论】:

  • 您好,感谢您的更新。一个快速的问题。如何使用列名映射数据,因为 CSV 中的“年份”应该转到“YearAdded”列,“度量”应该转到“记录”列。
  • 您可以使用 spark 的 .alias.as api 更改列名。我已经更新了我的答案
猜你喜欢
  • 2016-01-30
  • 1970-01-01
  • 2018-04-19
  • 2017-08-18
  • 2019-02-22
  • 1970-01-01
  • 1970-01-01
  • 2017-11-27
  • 2017-01-07
相关资源
最近更新 更多