【问题标题】:Databricks Pyspark - Group related rowsDatabricks Pyspark - 分组相关行
【发布时间】:2022-02-01 21:55:17
【问题描述】:

我正在解析 Azure Databricks 中的 EDI 文件。输入文件中的行根据它们出现的顺序与其他行相关。我需要的是一种将相关行组合在一起的方法。

import pyspark.sql.types as T
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.window import Window

spark = SparkSession.builder.getOrCreate()

data = [
    ('book000book title',),
    ('auth001first author',),
    ('auth002second author',),
    ('book003another book',),
    ('auth004third author',),
]

schema = T.StructType([T.StructField('Line', T.StringType())])
books = spark.createDataFrame(data, schema)

books = (books
        .withColumn('RecordType', F.substring(F.col('Line'), 1, 4))
        .withColumn('Sequence', F.substring(F.col('Line'), 5, 3))
        .withColumn('Title', F.when(F.col('RecordType') == 'book', F.trim(F.substring(F.col('Line'), 8, 20))).otherwise(F.lit(None)))
        .withColumn('Author', F.when(F.col('RecordType') == 'auth', F.trim(F.substring(F.col('Line'), 8, 20))).otherwise(F.lit(None)))
        .drop('Line')
        )

window = Window.orderBy('Sequence')
books = (books
         .withColumn('BookID', F.row_number().over(window))
        )

books.show()

book 记录之后,后续行是该书的作者,因此这些作者的BookID 应该与他们之前的最后一个book 记录相同。

这个的输出是:

+----------+--------+------------+-------------+------+
|RecordType|Sequence|       Title|       Author|BookID|
+----------+--------+------------+-------------+------+
|      book|     000|  book title|         null|     1|
|      auth|     001|        null| first author|     2|
|      auth|     002|        null|second author|     3|
|      book|     003|another book|         null|     4|
|      auth|     004|        null| third author|     5|
+----------+--------+------------+-------------+------+

我需要为每个作者分配正确的BookID,以便我能够对他们进行分组。我正在寻找的输出是:

+----------+--------+------------+-------------+------+
|RecordType|Sequence|       Title|       Author|BookID|
+----------+--------+------------+-------------+------+
|      book|     000|  book title|         null|     1|
|      auth|     001|        null| first author|     1|
|      auth|     002|        null|second author|     1|
|      book|     003|another book|         null|     2|
|      auth|     004|        null| third author|     2|
+----------+--------+------------+-------------+------+

我还没弄明白。任何帮助将不胜感激。

【问题讨论】:

    标签: apache-spark pyspark apache-spark-sql databricks azure-databricks


    【解决方案1】:

    您可以在按sequence 排序的窗口上使用条件总和聚合,如下所示:

    books = (books
             .withColumn('BookID', F.sum(F.when(F.col("RecordType") == "book", 1)).over(window))
             )
    
    books.show()
    #+----------+--------+------------+-------------+------+
    #|RecordType|Sequence|       Title|       Author|BookID|
    #+----------+--------+------------+-------------+------+
    #|      book|     000|  book title|         null|     1|
    #|      auth|     001|        null| first author|     1|
    #|      auth|     002|        null|second author|     1|
    #|      book|     003|another book|         null|     2|
    #|      auth|     004|        null| third author|     2|
    #+----------+--------+------------+-------------+------+
    

    【讨论】:

      猜你喜欢
      • 2019-08-28
      • 1970-01-01
      • 2022-01-07
      • 2020-07-11
      • 2020-08-27
      • 2022-01-19
      • 1970-01-01
      • 2023-03-24
      • 2019-10-29
      相关资源
      最近更新 更多