【问题标题】:Validate CSV file PySpark验证 CSV 文件 PySpark
【发布时间】:2019-04-23 18:57:46
【问题描述】:

我正在尝试验证 csv 文件(每条记录的列数)。根据下面的链接,在 Databricks 3.0 中有处理它的选项。

http://www.discussbigdata.com/2018/07/capture-bad-records-while-loading-csv.html

df = spark.read
  .option("badRecordsPath", "/data/badRecPath")
  .parquet("/input/parquetFile")

但是,我使用的是 2.3 spark 版本,无法使用该选项。

在读取 pyspark 的一部分并希望将不良记录写入文件时,有什么方法可以找出 csv 文件中的不良记录。

架构不是静态的,因为我们要处理多个表数据并且无法对架构进行硬编码。

        df = spark.read.option("wholeFile", "true"). \
                        option("header", "true"). \
                        option("quote", "\""). \
                        csv("${table}/path/to/csv/file")

【问题讨论】:

    标签: csv apache-spark dataframe pyspark


    【解决方案1】:

    我不确定您将哪种记录称为不良记录,因为我们看不到您的输入数据。 根据我的假设,假设我们有一个包含五列的以下输入文件。

    col1,col2,col3,col4,col5
    1,ABC,YYY,101,USA
    2,ABC,ZZZ,102,USA
    3,ABC,,,USA
    4,ABC,GGG,104,USA
    5,ABC,PPP,105
    

    和行号。 3 的空列很少,第 5 行的列较少。所以我不想在我的数据框中加载这两条记录。

    PATH_TO_FILE = "file:///user/vikrant/hivespark/userinput"
    
    df = sc.textFile(PATH_TO_FILE)\
               .mapPartitions(lambda line: csv.reader(line,delimiter=',', quotechar='"'))\
               .map(lambda x: [i for i in x if len(i)!= 0]) \
               .filter(lambda line: len(line) > 4 and line[0] != 'col1') \
               .toDF(['Col1','Col2','Col3','Col4','Col5'])
    
    
    >>> df.show();
    +----+----+----+----+----+
    |Col1|Col2|Col3|Col4|Col5|
    +----+----+----+----+----+
    |   1| ABC| YYY| 101| USA|
    |   2| ABC| ZZZ| 102| USA|
    |   4| ABC| GGG| 104| USA|
    +----+----+----+----+----+
    

    如果您想从输入文件中提取不良记录:

    badrecords = sc.textFile(PATH_TO_FILE)\
               .mapPartitions(lambda line: csv.reader(line,delimiter=',', quotechar='"'))\
               .map(lambda x: [i for i in x if len(i)!= 0]) \
               .filter(lambda line: len(line) < 5 and line[0] != 'col1')
    
    >>> badrecords.take(10)
    [['3', 'ABC', 'USA'], ['5', 'ABC', 'PPP', '105']]
    

    让我知道它是否对您有用或有帮助!

    【讨论】:

    • 如果我尝试运行该代码,它会显示以下错误:“未定义全局名称 'csv'” 您能否详细说明环境以及所需的导入语句?
    • 好的,我明白了...只是缺少导入 csv 语句。
    猜你喜欢
    • 1970-01-01
    • 2014-02-26
    • 1970-01-01
    • 2011-01-27
    • 1970-01-01
    • 2016-12-15
    • 1970-01-01
    • 2020-11-06
    • 2020-03-13
    相关资源
    最近更新 更多