【问题标题】:pyspark read text file with multiline columnpyspark 读取具有多行列的文本文件
【发布时间】:2020-08-10 01:01:10
【问题描述】:

我有以下格式错误的 txt 文件:

id;text;contact_id
1;Reason contact\
\
The client was not satisfied about the quality of the product\
\
;c_102932131

我正在尝试使用 pyspark 加载文件:

df = sc.read\
.option("delimiter", ";")\
.option("header", "true")\
.option("inferSchema", "true")\
.option("multiLine", "true")\
.option("wholeFile", "true")\
.csv(os.path.join(appconfig.configs[appconfig.ENV]["ROOT_DIR"], "data", "input", file_name))

但列文本被截断,因为数据框是:

id|text|contact_id
1|Reason contact|null
null|null|c_102932131

所以我失去了所有其他的行。目标是以这种方式正确读取文件:

id|text|contact_id
1|Reason contact The client was satisfied not about the quality of the product|c_102932131

我该怎么做? 谢谢

【问题讨论】:

    标签: csv dataframe apache-spark pyspark etl


    【解决方案1】:

    使用.wholeTextFiles然后替换new line (\n)\最后创建df。

    Example:

    Spark-Scala:

    sc.wholeTextFiles("<file_path>").
    toDF().
    selectExpr("""split(replace(regexp_replace(_2,"[\\\\|\n]",""),"id;text;contact_id",""),";") as new""").
    withColumn("id",col("new")(0)).
    withColumn("text",col("new")(1)).
    withColumn("contact_id",col("new")(2)).
    drop("new").
    show(false)
    //+---+---------------------------------------------------------------------------+-----------+
    //|id |text                                                                       |contact_id |
    //+---+---------------------------------------------------------------------------+-----------+
    //|1  |Reason contactThe client was not satisfied about the quality of the product|c_102932131|
    //+---+---------------------------------------------------------------------------+-----------+
    

    Pyspark:

    from pyspark.sql.functions import *
    
    sc.wholeTextFiles("<file_path>").\
    toDF().\
    selectExpr("""split(replace(regexp_replace(_2,'[\\\\\\\\|\n]',''),"id;text;contact_id",""),";") as new""").\
    withColumn("id",col("new")[0]).\
    withColumn("text",col("new")[1]).\
    withColumn("contact_id",col("new")[2]).\
    drop("new").\
    show(10,False)
    #+---+---------------------------------------------------------------------------+-----------+
    #|id |text                                                                       |contact_id |
    #+---+---------------------------------------------------------------------------+-----------+
    #|1  |Reason contactThe client was not satisfied about the quality of the product|c_102932131|
    #+---+---------------------------------------------------------------------------+-----------+
    

    【讨论】:

    • 您好,感谢您的回答。你能解释一下你在例子中写的函数 selectExpr 吗? _2 是什么?指列?最后一个“”是什么?谢谢
    • @br1, Yes,_2是转换成dataframe后的列名,selectExpr用于执行一组sql表达式spark.apache.org/docs/1.5.2/api/python/…"""视为string selectExpr 中的整个表达式。
    • 感谢您的回答。它确实有效!
    • 您好,您的解决方案仅适用于一行。如果您有多行,您的解决方案会将所有内容拆分为一个元素。这是不正确的。可能问题出在拆分功能上。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2020-07-30
    • 2019-02-10
    • 1970-01-01
    • 2014-11-03
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多