【问题标题】:Reading a csv file as a spark dataframe将 csv 文件作为 spark 数据帧读取
【发布时间】:2018-02-14 19:10:38
【问题描述】:

我有一个 CSV 文件以及一个必须通过 Spark(2.0.0 和 Scala 2.11.8)作为数据帧读取的标头。

示例 csv 数据:

Item,No. of items,Place
abc,5,xxx
def,6,yyy
ghi,7,zzz
.........

当我尝试在 spark 中读取此 csv 数据作为数据帧时遇到问题,因为标题包含具有特殊字符“。”的列(项目数)

我尝试读取 csv 数据的代码是:

val spark = SparkSession.builder().appName("SparkExample")
import spark.implicits._    
val df = spark.read.option("header", "true").csv("file:///INPUT_FILENAME")

我面临的错误:

Exception in thread "main" org.apache.spark.sql.AnalysisException: Unable to resolve No. of items given [Item,No. of items,Place];

如果我从标题中删除".",我不会收到任何错误。甚至尝试转义字符,但它甚至从数据中转义了所有"." 字符。

有什么方法可以使用 spark 代码仅从 CSV 标头中转义特殊字符 "."

【问题讨论】:

  • 我用 Spark 2.2 尝试了这个代码spark.read.format("csv").option("header", "true").load(input).show()。显示正常。
  • 我在我的 spark 2.0.0 中尝试了你给定的代码,但我仍然面临同样的问题。
  • 也许,不是在读取时出错,而是在后面的处理中出错?
  • 我同意@pasha701,你确定你在阅读部分得到了错误吗?请重新检查并确认。
  • 如果您没有很多列,那么只需跳过标题并单独提供架构。

标签: csv apache-spark dataframe


【解决方案1】:

我给你的例子是使用 pyspark,希望同样对你有用,只需添加一些与语言相关的语法。

file =r'C:\Users\e5543130\Desktop\sampleCSV2.csv'   
conf = SparkConf().setAppName('FICBOutputGenerator')
sc = SparkContext(conf=conf)
sc.setLogLevel("ERROR")
sqlContext = SQLContext(sc)
df = sqlContext.read.options(delimiter=",", header="true").csv("cars.csv")   #Without deprecated API
df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").option("delimiter", ",").load("cars.csv") 

【讨论】:

  • 我已经尝试过您的解决方案,但无法解决问题,我仍然面临同样的错误。
【解决方案2】:

@Pooja Nayak,不确定是否已解决;为了社区的利益回答这个问题。

sc: SparkContext
spark: SparkSession
sqlContext: SQLContext

// Read the raw file from localFS as-is.
val rdd_raw = sc.textFile("file:///home/xxxx/sample.csv")

// Drop the first line in first partition because it is the header.
val rdd = rdd_raw.mapPartitionsWithIndex{(idx,iter) => 
                      if(idx == 0) iter.drop(1) else iter
}

// A function to create schema dynamically.
def schemaCreator(header: String): StructType = {
  StructType(header
              .split(",")
              .map(field => StructField(field.trim, StringType, true))
  )
}

// Create the schema for the csv that was read and store it.
val csvSchema: StructType = schemaCreator(rdd_raw.first)

// As the input is CSV, split it at "," and trim away the whitespaces.
val rdd_curated = rdd.map(x => x.split(",").map(y => y.trim)).map(xy => Row(xy:_*))

// Create the DF from the RDD.
val df = sqlContext.createDataFrame(rdd_curated, csvSchema)

imports 是必要的

import org.apache.spark.sql.types._
import org.apache.spark.sql._
import org.apache.spark._

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2020-02-04
    • 2017-12-15
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多