【问题标题】:Inserting Data Into Cassandra table Using Spark DataFrame使用 Spark DataFrame 将数据插入 Cassandra 表
【发布时间】:2017-05-06 00:29:40
【问题描述】:

我正在使用 Scala 版本 2.10.5 Cassandra 3.0 和 Spark 1.6。我想将数据插入 cassandra,所以我尝试了基本示例

scala> val collection = sc.parallelize(Seq(("cat", 30), ("fox", 40)))
scala> collection.saveToCassandra("test", "words", SomeColumns("word", "count"))

哪个可以将数据插入 Cassandra。所以我有一个 csv 文件,我想通过匹配模式将其插入 Cassandra 表

val person = sc.textFile("hdfs://localhost:9000/user/hduser/person")
import org.apache.spark.sql._
val schema =  StructType(Array(StructField("firstName",StringType,true),StructField("lastName",StringType,true),StructField("age",IntegerType,true)))
val rowRDD = person.map(_.split(",")).map(p => org.apache.spark.sql.Row(p(0),p(1),p(2).toInt))
val personSchemaRDD = sqlContext.applySchema(rowRDD, schema)
 personSchemaRDD.saveToCassandra

当我使用 SaveToCassndra 时,我发现 saveToCassandra 不是 personSchemaRDD 的一部分。所以被教导以不同的方式尝试

 df.write.format("org.apache.spark.sql.cassandra").options(Map( "table" -> "words_copy", "keyspace" -> "test")).save()

但是无法在 ip:port 上连接到 cassandra。任何人都可以告诉我最好的方法。我需要定期将文件中的数据保存到 cassandra。

【问题讨论】:

    标签: scala apache-spark spark-cassandra-connector


    【解决方案1】:

    sqlContext.applySchema(...) 返回一个DataFrame,而DataFrame 没有saveToCassandra 方法。

    您可以使用 .write 方法:

    val personDF = sqlContext.applySchema(rowRDD, schema)
    personDF.write.format("org.apache.spark.sql.cassandra").options(Map( "table" -> "words_copy", "keyspace" -> "test")).save()
    

    如果我们想使用savetoCassandra 方法,最好的方法是拥有一个模式感知的RDD,使用一个案例类。

    case class Person(firstname:String, lastName:String, age:Int)
    val rowRDD = person.map(_.split(",")).map(p => Person(p(0),p(1),p(2).toInt)
    rowRDD.saveToCassandra(keyspace, table)
    

    Dataframe write 方法应该可以工作。检查您是否正确配置了上下文。

    【讨论】:

    • 如何将 Row() 中的元素之一转换为 `val rowRDD = input.map(_.split(",")).map(p => Row( p(0) ,getTimestamp((1)), p(2))) 我要 YYYY-MM-DD' 'HH:mm:ss 格式
    • @Anji 最好将时间戳映射到java.util.Datejodatime.DateTime 以避免格式问题。
    • 使用 com.databricks.spark.csv 时是否有任何选项可以将“NA”我的错误 Caused by: java.text.ParseException: Unparseable number: "NA"
    • @Anji 不要在 cmets 中拖问题。如果您有新问题,请考虑提出新问题。
    • 成功了,这是 cassandta.yaml 子网地址中更改的连接问题
    【解决方案2】:

    我将代码放在这里是为了使用 Spark Java 将 Spark 数据集保存到 Cassandra 表中。

    private static void readBigEmptable(SparkSession sparkSession) {
       String cassandraEmpColumns= "id,name,salary,age,city";
        Dataset<Row> bigDataset = sparkSession.sql("select * from big_emp");
        // Generate the schema for output row
        List<StructField> fields = new ArrayList<>();
        for (String fieldName : cassandraEmpColumns.split(",")) {
            StructField field = DataTypes.createStructField(fieldName, DataTypes.StringType, true);
            fields.add(field);
        }
        StructType schemaStructure = DataTypes.createStructType(fields);
        // Converting big dataset to RDD to perform operation on Row field
        JavaRDD<Row> bigRDD = bigDataset.toJavaRDD();
        JavaRDD<Row> resultRDD = bigRDD .map(new Function<Row, Row>() {
    
            /**
             * 
             */
            private static final long serialVersionUID = 1L;
    
            @Override
            public Row call(Row row) throws Exception {
                // return compareField(row).iterator();
                Row outputRow = RowFactory.create(row.getAs("id"), row.getAs("name"), row.getAs("salary"),
                        row.getAs("age"), row.getAs("city"));
                return outputRow;
            }
        });
        Dataset<Row> empDs = sparkSession.createDataFrame(resultRDD, schemaStructure);
        empDs.show();
        writeToCassandraTable(empDs);
    
    }
    
    private static void writeToCassandraTable(Dataset<Row> dataset) {
        Map<String, String> tableProperties = new HashMap();
        tableProperties.put("keyspace", "test_keyspace");
        tableProperties.put("table", "emp_test");
        tableProperties.put("confirm.truncate", "true");
        dataset.write().format("org.apache.spark.sql.cassandra").options(tableProperties).mode(SaveMode.Overwrite)
                .save();
    }
    

    注意:如果我们使用 mode(SaveMode.Overwrite) 那么我们应该使用 tableProperties.put("confirm.truncate", "true") ; 否则我们会收到错误消息。

    SaveMode.Append

    • Append 模式意味着当将 DataFrame 保存到数据源时, 如果数据/表已经存在,则需要 DataFrame 的内容 附加到现有数据中。

    SaveMode.ErrorIfExists

    • ErrorIfExists 模式意味着在将 DataFrame 保存到数据时 源,如果数据已经存在,则预计会出现异常 抛出。

    SaveMode.Ignore

    • 忽略模式是指在将DataFrame保存到数据源时,如果数据已经存在,则保存操作预计不会保存DataFrame的内容,也不会更改现有数据。

    SaveMode.Overwrite

    • 覆盖模式意味着当将 DataFrame 保存到数据源时, 如果数据/表已经存在,现有数据预计为 被 DataFrame 的内容覆盖。

    【讨论】:

      猜你喜欢
      • 2017-01-07
      • 2021-03-16
      • 2016-07-18
      • 2016-05-14
      • 2019-10-15
      • 2021-09-14
      • 1970-01-01
      • 2018-12-03
      • 2013-08-27
      相关资源
      最近更新 更多