【问题标题】:Concatenate PySpark rows using windows使用 windows 连接 PySpark 行
【发布时间】:2020-03-31 03:05:53
【问题描述】:

我有以下 PySpark 数据框。

+-----+---------------------+-------------------+
| name|           someString|          timestamp|
+-----+---------------------+-------------------+
|name1|            A-aa1-aa2|2012-01-10 00:00:00|
|name1|                 B-bb|2012-01-11 00:00:00|
|name1|            C-cc1-cc2|2012-01-13 00:00:00|
|name1|        D-dd1-dd2-dd3|2012-01-14 00:00:00|
|name1|                 E-ee|2012-01-15 00:00:00|
|name2|                 F-ff|2012-01-10 00:00:00|
|name2|        G-gg1-gg2-gg3|2012-01-11 00:00:00|
|name2|        H-hh1-hh2-hh3|2012-01-12 00:00:00|
|name2|I-ii1-ii2-ii3-ii4-ii5|2012-01-14 00:00:00|
+-----+---------------------+-------------------+

在这个数据框中,我想创建一个新的数据框(比如df2),它有一列(名为"concatStrings"连接行中的所有元素在列someString 滚动时间窗口为 3 天 每个唯一名称类型(与 df1 的所有列一起)。

在上面的例子中,我希望df2 看起来像下面这样:

+-----+---------------------+-------------------+--------------------------------------+
| name|           someString|          timestamp|                         concatStrings|
+-----+---------------------+-------------------+--------------------------------------+
|name1|            A-aa1-aa2|2012-01-10 00:00:00|   A-aa1-aa2                          | 
|name1|                 B-bb|2012-01-11 00:00:00|   A-aa1-aa2-B-bb                     | 
|name1|            C-cc1-cc2|2012-01-13 00:00:00|   B-bb-C-cc1-cc2                     |
|name1|        D-dd1-dd2-dd3|2012-01-14 00:00:00|   C-cc1-cc2-D-dd1-dd2-dd3            | 
|name1|                 E-ee|2012-01-15 00:00:00|   C-cc1-cc2-D-dd1-dd2-dd3-E-ee       |
|name2|                 F-ff|2012-01-10 00:00:00|   F-ff                               |
|name2|        G-gg1-gg2-gg3|2012-01-11 00:00:00|   F-ff-G-gg1-gg2-gg3                 |
|name2|        H-hh1-hh2-hh3|2012-01-12 00:00:00|   F-ff-G-gg1-gg2-gg3-H-hh1-hh2-hh3   |
|name2|I-ii1-ii2-ii3-ii4-ii5|2012-01-14 00:00:00|   H-hh1-hh2-hh3-I-ii1-ii2-ii3-ii4-ii5|
+-----+---------------------+-------------------+--------------------------------------+

我该怎么做?

下面是我到目前为止尝试过的代码:

win_ts = (
    Window.partitionBy("name")
    .orderBy(F.col("timestamp").cast("long"))
    .rangeBetween(-(3 - 1) * 86400, 0)
)

df2 = df1.withColumn( "concatStrings" , F.concat_ws("-" , F.col("someString").over(win_ts) ) )

但是,当我尝试上面的代码 sn-p 时,我得到了下面提到的错误:

Py4JJavaError: An error occurred while calling o235.withColumn.
: org.apache.spark.sql.AnalysisException: Expression 'someString#72' not supported within a window function.;;
Project [name#70, someString#72, timestamp#76, concatStrings#124]
+- Project [name#70, someString#72, timestamp#76, _w0#125L, _we0#126, concat_ws(-, _we0#126) AS concatStrings#124]
   +- Window [someString#72 windowspecdefinition(name#70, _w0#125L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, -172800, currentrow$())) AS _we0#126], [name#70], [_w0#125L ASC NULLS FIRST]
      +- Project [name#70, someString#72, timestamp#76, cast(timestamp#76 as bigint) AS _w0#125L]
         +- Project [name#70, someString#72, timestamp#76]

您可以使用下面提到的代码来生成数据框df1

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql import *
from pyspark.sql.types import TimestampType

df1_Stats = Row("name", "timestamp1", "someString")

df1_stat1 = df1_Stats("name1", "2012-01-10 00:00:00", "A-aa1-aa2")
df1_stat2 = df1_Stats("name1", "2012-01-11 00:00:00", "B-bb")
df1_stat3 = df1_Stats("name1", "2012-01-13 00:00:00", "C-cc1-cc2")
df1_stat4 = df1_Stats("name1", "2012-01-14 00:00:00", "D-dd1-dd2-dd3")
df1_stat5 = df1_Stats("name1", "2012-01-15 00:00:00", "E-ee")
df1_stat6 = df1_Stats("name2", "2012-01-10 00:00:00", "F-ff")
df1_stat7 = df1_Stats("name2", "2012-01-11 00:00:00", "G-gg1-gg2-gg3")
df1_stat8 = df1_Stats("name2", "2012-01-12 00:00:00", "H-hh1-hh2-hh3")
df1_stat9 = df1_Stats("name2", "2012-01-14 00:00:00", "I-ii1-ii2-ii3-ii4-ii5")

df1_stat_lst = [
    df1_stat1,
    df1_stat2,
    df1_stat3,
    df1_stat4,
    df1_stat5,
    df1_stat6,
    df1_stat7,
    df1_stat8,
    df1_stat9,
]

df1 = spark.createDataFrame(df1_stat_lst)
df1 = df1.withColumn( "timestamp" , df1["timestamp1"].cast(TimestampType()) ).drop("timestamp1")

【问题讨论】:

  • 我想你应该看看this。您可以将 someString 列移动一格,然后将值添加到您的实际列中
  • 将 F.col(..).over(win_ts) 更改为 F.collect_list(..).over(win_ts):df2 = df1.withColumn( "concatStrings" , F.concat_ws("-" , F.collect_list("someString").over(win_ts) ) )

标签: python sql pyspark pyspark-sql pyspark-dataframes


【解决方案1】:

您首先需要收集为列表,然后使用array_join 将其转换为字符串。这应该有效:

df2 = df1.withColumn("concatStrings" , F.collect_list("someString").over(win_ts)) \
         .withColumn("concatStrings", F.array_join("concatStrings", '-'))

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2016-04-29
    • 2015-09-25
    • 2017-01-27
    • 1970-01-01
    • 2019-09-21
    • 2021-11-06
    • 1970-01-01
    相关资源
    最近更新 更多