【问题标题】:Spark: convert a CSV to RDD[Row]Spark:将 CSV 转换为 RDD [Row]
【发布时间】:2017-07-24 17:47:16
【问题描述】:

我有一个 .csv 文件,其中包含以下结构的 258 列。

["label", "index_1", "index_2", ... , "index_257"]

现在我想将此 .csv 文件转换为 RDD[Row]:

val data_csv = sc.textFile("~/test.csv")

val rowRDD = data_csv.map(_.split(",")).map(p => Row( p(0), p(1).trim, p(2).trim)) 

如果我以这种方式进行转换,我必须专门写下 258 列。所以我尝试了:

val rowRDD = data_csv.map(_.split(",")).map(p => Row( _ => p(_).trim)) 

val rowRDD = data_csv.map(_.split(",")).map(p => Row( x => p(x).trim))

但是这两个也不行,报错:

error: missing parameter type for expanded function ((x$2) => p(x$2).trim)

谁能告诉我如何进行这种转换?非常感谢。

【问题讨论】:

    标签: scala apache-spark apache-spark-sql rdd


    【解决方案1】:

    你应该使用sqlContext 而不是sparkContext

    val df = sqlContext.read
      .format("com.databricks.spark.csv")
      .option("header", true)
      .load(("~/test.csv")
    

    这将创建dataframe。在df 上调用.rdd 应该会给你RDD[Row]

    val rdd = df.rdd
    

    【讨论】:

    • 非常感谢,但是当我运行您的代码时,它总是显示:线程“main”中的异常 java.lang.ClassNotFoundException:无法加载数据源的类:com.databricks.spark.csv。
    • 同样的错误:线程“main”中的异常 java.lang.ClassNotFoundException:无法为数据源加载类:csv。
    • try val df = sqlContext.read.option("header", true).csv("~/test.csv") then :) 肯定可以工作
    • 非常感谢您,非常感谢您的帮助。这次我得到:error: value csv is not a member of org.apache.spark.sql.DataFrameReader.
    • 你是如何初始化sqlContext的?你必须这样做val sqlContext = SparkSession.builder().appName("application name").master("local").getOrCreate().sqlContext
    【解决方案2】:

    而不是作为 textFile 读取 CSV 文件与spark-csv

    你的情况

    val df = sqlContext.read
        .format("com.databricks.spark.csv")
        .option("header", "true") // Use first line of all files as header
        .option("inferSchema", "true") // Automatically infer data types
        .option("quote", "\"")  //escape the quotes 
        .option("ignoreLeadingWhiteSpace", true)  // escape space before your data
        .load("cars.csv")
    

    这会将数据加载为数据框,现在您可以轻松地将其更改为 RDD。

    希望这会有所帮助!

    【讨论】:

      【解决方案3】:

      除了其他正确的答案外,正确的做法是在 map 函数中使用 Row.fromSeq

      val rdd = sc.parallelize(Array((1 to 258).toArray, (1 to 258).toArray) )
                  .map(Row.fromSeq(_))
      

      这会将您的rdd 变成Row

       Array[org.apache.spark.sql.Row] = Array([1,2,3,4,5,6,7,8,9,10...
      

      【讨论】:

      • 谢谢。请问如何在我的代码中做到这一点?我试过 val rowRDD = data_csv.map(.split(",")).map(p => Row.fromSeq(p().trim)) 但它不正确。
      • 您可以先trim,然后再转换为Row。类似data_csv.map(_.split(",")).map(x => x.map(_.trim)).map(Row.fromSeq(_))
      猜你喜欢
      • 2017-06-13
      • 1970-01-01
      • 2016-12-25
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多