【问题标题】:Group rows based on changing pattern of a column PySpark根据列 PySpark 的变化模式对行进行分组
【发布时间】:2021-12-22 09:46:53
【问题描述】:

我正在努力处理包含会议信息的 Pyspark 数据框,其中所说的每个单词都表示为一行。我喜欢将每个人说的那些话分组,直到另一个人开始说话。 (只有两个人说话)

我已经尝试了一些窗口函数,但始终无法达到所需的输出。 很乐意为您提供任何帮助!

Input:

| Call_id| Speaker  | WordNum| Word |
|------- |----------| -------|------|
| 1      | Speaker_1| 1      |Hi    |
| 1      | Speaker_1| 2      |I     |
| 1      | Speaker_1| 3      |am    |
| 1      | Speaker_1| 4      |Pete  |
| 1      | Speaker_2| 5      |Hello |
| 1      | Speaker_1| 6      |Sorry |
| 1      | Speaker_1| 7      |Gotta |
| 1      | Speaker_1| 8      |Leave |
| 2      | Speaker_2| 1      |Hello |
| 2      | Speaker_2| 2      |Luis  |
| 2      | Speaker_1| 3      |Hey   |

Desired Output:

| Call_id| Speaker  | Sentence                    | 
|------- |----------| ----------------------------|
| 1      | Speaker_1| ["Hi", "I", "am", "Pete"]   |
| 1      | Speaker_2| ["Hello"]                   |
| 1      | Speaker_1| ["Sorry", "Gotta", "Leave"] |
| 2      | Speaker_2| ["Hello", "Luis"]           |
| 2      | Speaker_1| ["Hey"]                     |

【问题讨论】:

  • 你有一个列来排序你的数据框吗?
  • 是的,Call_id和WordNum的组合

标签: python dataframe pyspark apache-spark-sql window-functions


【解决方案1】:

您可以创建句子 id,然后按通话 ID、说话者和句子 ID 分组并收集单词。创建句子 id - 通过比较当前值和以前的值来检查说话者值何时发生变化。创建指示符列,如果说话人发生变化,使用词数作为标识符,否则为0。之后,cumsum这个指示符将作为句子id。

import pyspark.sql.functions as F
from pyspark.sql import Window

# 1  create speaker lag column and fill in NAs with current speaker info
w = Window.partitionBy("Call_id").orderBy("WordNum")
df1 = (df
       .withColumn("speaker_lag", F.lag("Speaker").over(w))
       .withColumn("speaker_lag1", F.coalesce("speaker_lag", "Speaker")))

# 2 create sentence indicator
df2 = (df1
       .withColumn("session", 
                   F.when(F.col("Speaker")==F.col("speaker_lag1"), 0)
                    .otherwise(F.col("WordNum"))))

# create sentence id
w = (Window
      .partitionBy("Call_id")
      .orderBy("WordNum")
      .rangeBetween(Window.unboundedPreceding, 0))
df3 = df2.withColumn("Sentence_id", F.sum("session").over(w))

# group by and collect
df4 = (df3
       .groupBy("Call_id", "Speaker", "Sentence_id")
       .agg(F.sort_array(F.collect_list(F.struct("WordNum", "Word")))
            .alias("collect"))
       .withColumn("Sentence", F.col("collect")["Word"])
       .select("Call_id", "Speaker", "Sentence_id", "Sentence")
      )

对于您的数据输入:

cols = ["Call_id", "Speaker", "WordNum", "Word"]
data = [
( 1      , "Speaker_1", 1      ,"Hi"    ),
( 1      , "Speaker_1", 2      ,"I"     ),
( 1      , "Speaker_1", 3      ,"am"    ),
( 1      , "Speaker_1", 4      ,"Pete"  ),
( 1      , "Speaker_2", 5      ,"Hello" ),
( 1      , "Speaker_1", 6      ,"Sorry" ),
( 1      , "Speaker_1", 7      ,"Gotta" ),
( 1      , "Speaker_1", 8      ,"Leave" ),
( 2      , "Speaker_2", 1      ,"Hello" ),
( 2      , "Speaker_2", 2      ,"Luis" ),
( 2      , "Speaker_1", 3      ,"Hey"  )]

df = spark.createDataFrame(data, schema=cols)

输出 df4 将是

【讨论】:

  • 谢谢,完美!
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-08-05
  • 2019-02-12
  • 2013-02-16
  • 2015-08-08
  • 1970-01-01
相关资源
最近更新 更多