【问题标题】:Reading CSV in Spark that has parent/child rows of different types在 Spark 中读取具有不同类型的父/子行的 CSV
【发布时间】:2020-03-25 17:25:15
【问题描述】:

我会用下面的例子来解释。

|行号 | Col0(类型) | Col1 | Col2 | Col3 | |--------|------------|--------------|------------ --|--------| | 1 | 01 |数据-c1-01 | 1600 美元 | | | 2 | 01 |数据-c1-01 | 12 美元 | | | 3 | 02 |数据-c1-02 | 2019/11/30 | | | 4 | 03 |数据-c1-03 | 1.5 | | | 5 | 01 |数据-c1-01 | 12 美元 | | | 6 | 04 |数据-c1-04 | * | |

第 1、2 和 5 行是父行(类型 01)。父行可能有 0 个或多个不同类型的子行。所以第 2 行有两个 02 和 03 类型的子行。

我需要以上述格式加载一个相当大的 CSV,并处理父行及其任何子行(数量可能不同)。子行是任何非 01 类型行,其父行是前面的第一个 01 类型行。所以行顺序很重要。

目前我能想到的唯一解决方案是逐行顺序读写,在每组父子行之间引入唯一键。然后在 spark 中加载这个新的 CSV 以按此键分组。

【问题讨论】:

    标签: apache-spark tree


    【解决方案1】:

    虽然我认为这对于 Apache Spark 来说不是一个很好的案例,但可以使用 Window 函数来创建累积和。

    In [1]: from pyspark.sql import Window
       ...: from pyspark.sql.functions import col, sum as sparksum, when
       ...: 
       ...: df = spark.createDataFrame(((1, 1), (2, 1), (3, 2), (4, 3), (5, 1), (6, 4)),
       ...:                            schema=("rowno", "type"))
       ...: wind = (Window
       ...:         .orderBy("rowno")
       ...:         .rowsBetween(Window.unboundedPreceding, Window.currentRow)
       ...:         )
       ...: (df
       ...:  .withColumn("group",
       ...:              sparksum(when(col("type") == 1, 1).otherwise(0)).over(wind))
       ...:  .show()
       ...:  )
       ...: 
       ...:  
    19/12/01 01:38:19 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
    +-----+----+-----+                                                              
    |rowno|type|group|
    +-----+----+-----+
    |    1|   1|    1|
    |    2|   1|    2|
    |    3|   2|    2|
    |    4|   3|    2|
    |    5|   1|    3|
    |    6|   4|    3|
    +-----+----+-----+
    

    注意那里的警告。它基本上说您在此操作期间失去了所有并行性,因此基本上您被简化为单核执行,这与使用单个线程按顺序遍历文件没有什么不同。之后你可以再次repartition

    获得组号后,您可以运行groupBy(或groupByKey)并处理父行及其所有子行。

    【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2012-08-18
    • 2020-12-24
    • 2022-06-30
    • 2020-03-27
    • 2020-10-14
    相关资源
    最近更新 更多