【问题标题】:Spark: How to define schema to read csv file with different number of columns?Spark:如何定义模式以读取具有不同列数的 csv 文件?
【发布时间】:2020-07-17 13:51:38
【问题描述】:

我正在尝试使用 Pyspark 读取 csv 文件。 Csv-File 有一些元信息和数据列,它们有不同的列号和结构。

Excel 读取此文件没有问题。 我想在 spark 中定义一个自定义模式来读取这个文件。 这是一个例子:

HEADER_TAG\tHEADER_VALUE
FORMAT\t2.00
NUMBER_PASSES\t0001
"Time"\t"Name"\t"Country"\t"City"\t"Street"\t"Phone1"\t"Phone2"
0.49tName1\tUSA\tNewYork\t5th Avenue\t123456\t+001236273
0.5tName2\tUSA\tWashington\t524 Street\t222222\t+0012222
0.62tName3\tGermany\tBerlin\tLinden Strasse\t3434343\t+491343434
NUM_DATA_ROWS\t3
NUM_DATA_COLUMNS\t7
START_TIME_FORMAT\tMM/dd/yyyy HH:mm:ss
START_TIME\t06/04/2019 13:04:23
END_HEADER

没有预定义的Schema spark只读2列:

df_static = spark.read.options(header='false', inferschema='true', multiLine=True, delimiter = "\t",mode="PERMISSIVE",).csv("/FileStore/111.txt")

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)

【问题讨论】:

    标签: csv pyspark stream schema


    【解决方案1】:

    定义自定义架构:

    from pyspark.sql.types import *
    
    
    # define it as per your data types
    user_schema = StructType([
    ...    StructField("time", TimestampType(), True),
    ...    StructField("name", StringType(), True),
    ...    StructField("Country", StringType(), True),
    ...    StructField("City", StringType(), True),
    ...    StructField("Phone1", StringType(), True),
    ...    StructField("Phone2", StringType(), True),])
    

    参考:https://spark.apache.org/docs/2.1.2/api/python/_modules/pyspark/sql/types.html

    df_static = spark.read.schema(user_schema).options(header='false', multiLine=True, delimiter = "\t", mode="PERMISSIVE").csv("/FileStore/111.txt")
    

    【讨论】:

    • 我可以同时读取元信息和数据列吗?谢谢!
    • 所以,一步都不可能。先解析元信息,然后添加文件的数据列,反之亦然。因为它本质上是多个数据行的一个元信息。
    【解决方案2】:

    我有多个如下所示的架构

    user_schema1 = StructType([
    ...    StructField("time", TimestampType(), True),
    ...    StructField("name", StringType(), True),
    ...    StructField("Country", StringType(), True),...   
    ...    ])
    
    user_schema2 = StructType([...    
    ...    StructField("Phone1", StringType(), True),
    ...    StructField("Phone2", StringType(), True),])
    
    
    df_static = spark.read.schema(user_schema(send schema name dynamic)).options(header='false', multiLine=True, delimiter = "\t", mode="PERMISSIVE").csv("/FileStore/111.txt")`enter code here`
    
    Kindly provide me the solution
    

    【讨论】:

      猜你喜欢
      • 2022-12-12
      • 2013-09-26
      • 1970-01-01
      • 1970-01-01
      • 2022-06-30
      • 2020-12-24
      • 2021-06-20
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多