【问题标题】:Spark - load CSV file as DataFrame?Spark - 将CSV文件加载为DataFrame?
【发布时间】:2015-06-24 14:35:06
【问题描述】:

我想在 spark 中读取 CSV 并将其转换为 DataFrame 并使用 df.registerTempTable("table_name") 将其存储在 HDFS 中

我试过了:

scala> val df = sqlContext.load("hdfs:///csv/file/dir/file.csv")

我得到的错误:

java.lang.RuntimeException: hdfs:///csv/file/dir/file.csv is not a Parquet file. expected magic number at tail [80, 65, 82, 49] but found [49, 59, 54, 10]
    at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:418)
    at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:277)
    at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:276)
    at scala.collection.parallel.mutable.ParArray$Map.leaf(ParArray.scala:658)
    at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54)
    at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
    at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
    at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56)
    at scala.collection.parallel.mutable.ParArray$Map.tryLeaf(ParArray.scala:650)
    at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:165)
    at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:514)
    at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

在 Apache Spark 中将 CSV 文件加载为 DataFrame 的正确命令是什么?

【问题讨论】:

标签: scala apache-spark hadoop apache-spark-sql hdfs


【解决方案1】:

使用 Spark 2.x 解析 CSV 并加载为 DataFrame/DataSet

首先,初始化SparkSession对象默认情况下它会在shell中作为spark使用

val spark = org.apache.spark.sql.SparkSession.builder
        .master("local") # Change it as per your cluster
        .appName("Spark CSV Reader")
        .getOrCreate;

使用以下任一方式将 CSV 加载为DataFrame/DataSet

1。以编程方式进行

 val df = spark.read
         .format("csv")
         .option("header", "true") //first line in file has headers
         .option("mode", "DROPMALFORMED")
         .load("hdfs:///csv/file/dir/file.csv")

更新:添加所有选项from here,以防将来链接断开

  • 路径:文件的位置。与 Spark 类似,可以接受标准 Hadoop 通配表达式。
  • header:设置为 true 时,文件的第一行将用于命名列,并且不会包含在数据中。所有类型都将被假定为字符串。默认值为 false。
  • delimiter:默认情况下,使用分隔列,但 delimiter 可以设置为任何字符
  • quote:默认情况下,引号字符为 ",但可以设置为任何字符。引号内的分隔符被忽略
  • escape:默认情况下,转义字符为 ,但可以设置为任意字符。转义的引号字符被忽略
  • parserLib:默认情况下,可以将“commons”设置为“univocity”以使用该库进行 CSV 解析。
  • mode:决定解析模式。默认情况下它是允许的。可能的值为:
    • PERMISSIVE:尝试解析所有行:为丢失的标记插入空值,忽略多余的标记。
    • DROPMALFORMED:丢弃标记少于或多于预期的行或不匹配架构的标记
    • FAILFAST:如果遇到任何格式错误的行,则以 RuntimeException 中止 字符集:默认为 'UTF-8' 但可以设置为其他有效的字符集名称
  • inferSchema:自动推断列类型。它需要对数据进行一次额外的传递,默认情况下为 false 注释:跳过以该字符开头的行。默认为“#”。通过将此设置为 null 来禁用 cmets。
  • nullValue:指定一个字符串,表示一个空值,任何匹配这个字符串的字段都会在DataFrame中设置为空
  • dateFormat:指定一个字符串,指示读取日期或时间戳时使用的日期格式。自定义日期格式遵循 java.text.SimpleDateFormat 中的格式。这适用于 DateType 和 TimestampType。默认为 null,表示尝试通过 java.sql.Timestamp.valueOf() 和 java.sql.Date.valueOf() 解析时间和日期。

2。 You can do this SQL way as well

 val df = spark.sql("SELECT * FROM csv.`hdfs:///csv/file/dir/file.csv`")

依赖关系

 "org.apache.spark" % "spark-core_2.11" % 2.0.0,
 "org.apache.spark" % "spark-sql_2.11" % 2.0.0,

Spark 版本
val df = sqlContext.read
    .format("com.databricks.spark.csv")
    .option("header", "true") 
    .option("mode", "DROPMALFORMED")
    .load("csv/file/path"); 

依赖关系:

"org.apache.spark" % "spark-sql_2.10" % 1.6.0,
"com.databricks" % "spark-csv_2.10" % 1.6.0,
"com.univocity" % "univocity-parsers" % LATEST,

【讨论】:

  • 此会话是否需要配置单元?我收到 hive 错误。
  • 不需要。只有2.0.1 版本的spark-core_2.11spark-sql_2.11 可以。如果可能,添加错误消息。
  • 我们可以将管道分隔的文件转换为数据帧吗?
  • @OmkarPuttagunta:是的,当然!尝试这样的事情spark.read.format("csv").option("delimiter ", "|") ...
  • programmatic way 的另一个选项是去掉.format("csv") 并用.csv(... 替换.load(...option 方法属于 read 方法返回的 DataFrameReader 类,其中 loadcsv 方法返回一个数据帧,因此在调用它们后不能标记选项。这个答案非常彻底,但您应该链接到文档,以便人们可以看到所有其他可用的 CSV 选项spark.apache.org/docs/latest/api/scala/…*):org.apache.spark.sql.DataFrame
【解决方案2】:

在 POM 文件中添加以下 Spark 依赖项:

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.11</artifactId>
    <version>2.2.0</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.11</artifactId>
    <version>2.2.0</version>
</dependency>

火花配置:

val spark = SparkSession.builder().master("local").appName("Sample App").getOrCreate()

读取 csv 文件:

val df = spark.read.option("header", "true").csv("FILE_PATH")

显示输出:

df.show()

【讨论】:

    【解决方案3】:

    使用 Spark 2.4+,如果您想从本地目录加载 csv,那么您可以使用 2 个会话并将其加载到 hive 中。第一个会话应使用 master() 配置创建为“local[*]”,第二个会话应使用“yarn”和 Hive 启用。

    下面的对我有用。

    import org.apache.log4j.{Level, Logger}
    import org.apache.spark._
    import org.apache.spark.rdd._
    import org.apache.spark.sql._
    
    object testCSV { 
    
      def main(args: Array[String]) {
        Logger.getLogger("org").setLevel(Level.ERROR)
        val spark_local = SparkSession.builder().appName("CSV local files reader").master("local[*]").getOrCreate()
    
        import spark_local.implicits._
        spark_local.sql("SET").show(100,false)
        val local_path="/tmp/data/spend_diversity.csv"  // Local file
        val df_local = spark_local.read.format("csv").option("inferSchema","true").load("file://"+local_path) // "file://" is mandatory
        df_local.show(false)
    
        val spark = SparkSession.builder().appName("CSV HDFS").config("spark.sql.warehouse.dir", "/apps/hive/warehouse").enableHiveSupport().getOrCreate()
    
        import spark.implicits._
        spark.sql("SET").show(100,false)
        val df = df_local
        df.createOrReplaceTempView("lcsv")
        spark.sql(" drop table if exists work.local_csv ")
        spark.sql(" create table work.local_csv as select * from lcsv ")
    
       }
    

    当使用spark2-submit --master "yarn" --conf spark.ui.enabled=false testCSV.jar 运行时,它运行良好并在 hive 中创建了表。

    【讨论】:

      【解决方案4】:

      要从系统上的相对路径读取,使用 System.getProperty 方法获取当前目录,并进一步使用相对路径加载文件。

      scala> val path = System.getProperty("user.dir").concat("/../2015-summary.csv")
      scala> val csvDf = spark.read.option("inferSchema","true").option("header", "true").csv(path)
      scala> csvDf.take(3)
      

      火花:2.4.4 斯卡拉:2.11.12

      【讨论】:

        【解决方案5】:

        spark-csv 是 Spark 核心功能的一部分,不需要单独的库。 所以你可以做例如

        df = spark.read.format("csv").option("header", "true").load("csvfile.csv")
        

        在 scala 中,(这适用于任何格式分隔符提及“,”用于 csv,“\t”用于 tsv 等)

        val df = sqlContext.read.format("com.databricks.spark.csv") .option("delimiter", ",") .load("csvfile.csv")

        【讨论】:

          【解决方案6】:

          借助内置的 Spark csv,您可以使用 Spark > 2.0 的新 SparkSession 对象轻松完成。

          val df = spark.
                  read.
                  option("inferSchema", "false").
                  option("header","true").
                  option("mode","DROPMALFORMED").
                  option("delimiter", ";").
                  schema(dataSchema).
                  csv("/csv/file/dir/file.csv")
          df.show()
          df.printSchema()
          

          您可以设置多种选项。

          • header: 你的文件是否在顶部包含标题行
          • inferSchema: 是否要自动推断模式。默认为true。我总是更喜欢提供架构以确保正确的数据类型。
          • mode:解析模式,PERMISSIVE,DROPMALFORMED 或 FAILFAST
          • delimiter: 指定分隔符,默认为逗号(',')

          【讨论】:

            【解决方案7】:

            如果使用 spark 2.0+ 试试这个

            For non-hdfs file:
            df = spark.read.csv("file:///csvfile.csv")
            
            
            For hdfs file:
            df = spark.read.csv("hdfs:///csvfile.csv")
            
            For hdfs file (with different delimiter than comma:
            df = spark.read.option("delimiter","|")csv("hdfs:///csvfile.csv")
            

            注意:- 这适用于任何分隔文件。只需使用 option(“delimiter”,) 来更改值。

            希望这有帮助。

            【讨论】:

            • 这与现有答案相同
            【解决方案8】:

            如果您使用 scala 2.11 和 Apache 2.0 或更高版本构建 jar。

            无需创建sqlContextsparkContext 对象。只需一个 SparkSession 对象即可满足所有需求。

            以下是运行良好的 mycode:

            import org.apache.spark.sql.{DataFrame, Row, SQLContext, SparkSession}
            import org.apache.log4j.{Level, LogManager, Logger}
            
            object driver {
            
              def main(args: Array[String]) {
            
                val log = LogManager.getRootLogger
            
                log.info("**********JAR EXECUTION STARTED**********")
            
                val spark = SparkSession.builder().master("local").appName("ValidationFrameWork").getOrCreate()
                val df = spark.read.format("csv")
                  .option("header", "true")
                  .option("delimiter","|")
                  .option("inferSchema","true")
                  .load("d:/small_projects/spark/test.pos")
                df.show()
              }
            }
            

            如果您在集群中运行,只需在定义 sparkBuilder 对象时将 .master("local") 更改为 .master("yarn")

            Spark Doc 涵盖了以下内容: https://spark.apache.org/docs/2.2.0/sql-programming-guide.html

            【讨论】:

            • 这与现有答案相同
            【解决方案9】:

            解析 CSV 文件有很多挑战,如果文件较大,它会不断增加,如果列值中有非英文/转义/分隔符/其他字符,则可能导致解析错误。

            魔法就在所使用的选项中。对我有用并希望涵盖大多数边缘情况的代码如下:

            ### Create a Spark Session
            spark = SparkSession.builder.master("local").appName("Classify Urls").getOrCreate()
            
            ### Note the options that are used. You may have to tweak these in case of error
            html_df = spark.read.csv(html_csv_file_path, 
                                     header=True, 
                                     multiLine=True, 
                                     ignoreLeadingWhiteSpace=True, 
                                     ignoreTrailingWhiteSpace=True, 
                                     encoding="UTF-8",
                                     sep=',',
                                     quote='"', 
                                     escape='"',
                                     maxColumns=2,
                                     inferSchema=True)
            

            希望对您有所帮助。更多参考:Using PySpark 2 to read CSV having HTML source code

            注意:以上代码来自 Spark 2 API,其中 CSV 文件读取 API 与 Spark 可安装的内置包捆绑在一起。

            注意:PySpark 是 Spark 的 Python 包装器,与 Scala/Java 共享相同的 API。

            【讨论】:

              【解决方案10】:

              默认文件格式是带有 spark.read.. 的 Parquet 和读取 csv 的文件,这就是您遇到异常的原因。使用您尝试使用的 api 指定 csv 格式

              【讨论】:

                【解决方案11】:

                适用于 Hadoop 为 2.6,Spark 为 1.6 且没有“databricks”包的用户。

                import org.apache.spark.sql.types.{StructType,StructField,StringType,IntegerType};
                import org.apache.spark.sql.Row;
                
                val csv = sc.textFile("/path/to/file.csv")
                val rows = csv.map(line => line.split(",").map(_.trim))
                val header = rows.first
                val data = rows.filter(_(0) != header(0))
                val rdd = data.map(row => Row(row(0),row(1).toInt))
                
                val schema = new StructType()
                    .add(StructField("id", StringType, true))
                    .add(StructField("val", IntegerType, true))
                
                val df = sqlContext.createDataFrame(rdd, schema)
                

                【讨论】:

                  【解决方案12】:

                  使用 Spark 2.0,以下是读取 CSV 的方法

                  val conf = new SparkConf().setMaster("local[2]").setAppName("my app")
                  val sc = new SparkContext(conf)
                  val sparkSession = SparkSession.builder
                    .config(conf = conf)
                    .appName("spark session example")
                    .getOrCreate()
                  
                  val path = "/Users/xxx/Downloads/usermsg.csv"
                  val base_df = sparkSession.read.option("header","true").
                    csv(path)
                  

                  【讨论】:

                  • spark.read.csv(path)spark.read.format("csv").load(path)有区别吗?
                  【解决方案13】:

                  在 Java 1.8 中,这段代码 sn-p 可以完美地读取 CSV 文件

                  POM.xml

                  <dependency>
                      <groupId>org.apache.spark</groupId>
                      <artifactId>spark-core_2.11</artifactId>
                      <version>2.0.0</version>
                  </dependency>
                  <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql_2.10 -->
                  <dependency>
                      <groupId>org.apache.spark</groupId>
                      <artifactId>spark-sql_2.10</artifactId>
                      <version>2.0.0</version>
                  </dependency>
                  
                  <!-- https://mvnrepository.com/artifact/org.scala-lang/scala-library -->
                  <dependency>
                      <groupId>org.scala-lang</groupId>
                      <artifactId>scala-library</artifactId>
                      <version>2.11.8</version>
                  </dependency>
                  <dependency>
                      <groupId>com.databricks</groupId>
                      <artifactId>spark-csv_2.10</artifactId>
                      <version>1.4.0</version>
                  </dependency>
                  

                  Java

                  SparkConf conf = new SparkConf().setAppName("JavaWordCount").setMaster("local");
                  // create Spark Context
                  SparkContext context = new SparkContext(conf);
                  // create spark Session
                  SparkSession sparkSession = new SparkSession(context);
                  
                  Dataset<Row> df = sparkSession.read().format("com.databricks.spark.csv").option("header", true).option("inferSchema", true).load("hdfs://localhost:9000/usr/local/hadoop_data/loan_100.csv");
                  
                          //("hdfs://localhost:9000/usr/local/hadoop_data/loan_100.csv");
                  System.out.println("========== Print Schema ============");
                  df.printSchema();
                  System.out.println("========== Print Data ==============");
                  df.show();
                  System.out.println("========== Print title ==============");
                  df.select("title").show();
                  

                  【讨论】:

                  • 虽然这可能对某人有用。该问题有一个 Scala 标记。
                  【解决方案14】:

                  Penny 的 Spark 2 示例是在 spark2 中执行此操作的方法。还有一个技巧:通过对数据进行初始扫描,将选项inferSchema 设置为true,为您生成该标头

                  那么,假设spark 是您设置的一个 spark 会话,是加载亚马逊在 S3 上托管的所有 Landsat 图像的 CSV 索引文件的操作。

                    /*
                     * Licensed to the Apache Software Foundation (ASF) under one or more
                     * contributor license agreements.  See the NOTICE file distributed with
                     * this work for additional information regarding copyright ownership.
                     * The ASF licenses this file to You under the Apache License, Version 2.0
                     * (the "License"); you may not use this file except in compliance with
                     * the License.  You may obtain a copy of the License at
                     *
                     *    http://www.apache.org/licenses/LICENSE-2.0
                     *
                     * Unless required by applicable law or agreed to in writing, software
                     * distributed under the License is distributed on an "AS IS" BASIS,
                     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
                     * See the License for the specific language governing permissions and
                     * limitations under the License.
                     */
                  
                  val csvdata = spark.read.options(Map(
                      "header" -> "true",
                      "ignoreLeadingWhiteSpace" -> "true",
                      "ignoreTrailingWhiteSpace" -> "true",
                      "timestampFormat" -> "yyyy-MM-dd HH:mm:ss.SSSZZZ",
                      "inferSchema" -> "true",
                      "mode" -> "FAILFAST"))
                    .csv("s3a://landsat-pds/scene_list.gz")
                  

                  坏消息是:这会触发对文件的扫描;对于像这个 20+MB 压缩 CSV 文件这样的大文件,在长途连接中可能需要 30 秒。记住这一点:一旦你得到它,你最好手动编码模式。

                  (代码 sn-p Apache Software License 2.0 许可以避免所有歧义;我在 S3 集成的演示/集成测试中做了一些事情)

                  【讨论】:

                  • 我没有见过这种 csv 方法或将地图传递给选项。同意提供显式模式总是更好,inferSchema 对于快速 n 脏(又名数据科学)来说很好,但对于 ETL 来说很糟糕。
                  猜你喜欢
                  • 1970-01-01
                  • 1970-01-01
                  • 2016-09-14
                  • 1970-01-01
                  • 1970-01-01
                  • 1970-01-01
                  • 1970-01-01
                  相关资源
                  最近更新 更多