【问题标题】:Unexplode in pyspark with sequence conditional在具有条件序列的 pyspark 中展开
【发布时间】:2021-07-12 04:43:05
【问题描述】:

我需要解开数据框 pyspark 中的一列有条件的序列号。例如

输入数据帧

期望输出数据帧

你可以看到当c1 = 1在一行中,该行将将c4列的内容分成新行(因为长度超过限制)。否则如果当 c1 = 0 时 c4 包含完整内容,则无需换行。 c4列可以将其分成多行下一个

pyspark 中的 pyspark.sql.functions.explode(col) 相同,我需要取消爆炸,但我有一个条件是 c1 列(这并不简单,例如 group by then collect list df.groupby().agg(F.collect_list()),因为c1是有条件的)

我尝试通过这个主题PySpark - Append previous and next row to current row使用窗口函数流。但是当 c4 col 下一个中断多行时我该如何解决

示例代码

from pyspark.sql import SparkSession
spark_session = SparkSession.builder.getOrCreate()

df_in = spark_session.createDataFrame(
    [
      (1, 'a', 'b', 'c1', 'd'),
      (0, 'a', 'b', 'c2', 'd'),
      (0, 'e', 'f', 'g', 'h'),
      (0, '1', '2', '3', '4'),
      (1, 'x', 'y', 'z1', 'k'),
      (1, 'x', 'y', 'z2', 'k'),
      (1, 'x', 'y', 'z3', 'k'),
      (0, 'x', 'y', 'z4', 'k'),
      (1, '6', '7', '81', '9'),
      (0, '6', '7', '82', '9'),
    ],
    ['c1', 'c2', 'c3', 'c4', 'c5']
)

df_out = spark_session.createDataFrame(
    [
      ('a', 'b', 'c1-c2', 'd'),
      ('e', 'f', 'g', 'h'),
      ('1', '2', '3', '4'),
      ('x', 'y', 'z1-z2-z3-z4', 'k'), 
      ('6', '7', '81-82', '9')
    ],
    ['c2', 'c3', 'c4', 'c5']
)

df_in.show()
df_out.show()

我该如何解决。谢谢


更新 输入

df_in = spark_session.createDataFrame(
    [
      ('0', 1, 'a', 'b', 'c1', 'd'),
      ('0', 0, 'a', 'b', 'c2', 'd'),
      ('0', 0, 'e', 'f', 'g', 'h'),
      ('0', 0, '1', '2', '3', '4'),
      ('0', 1, 'x', 'y', 'sele', 'k'),
      ('0', 1, 'x', 'y', 'ct ', 'k'),
      ('0', 1, 'x', 'y', 'from', 'k'),
      ('0', 0, 'x', 'y', 'a', 'k'),
      ('0', 1, '6', '7', '81', '9'),
      ('0', 0, '6', '7', '82', '9'),
    ],
    ['c0', 'c1', 'c2', 'c3', 'c4', 'c5']
)

输出

期待输出

x| y|从-a中选择| k

【问题讨论】:

  • 我不太明白c1这个专栏的效果。例如,如果 x-y- 行包含 0 而不是 c1 中的 1s,那么预期的输出将如何变化?
  • @werner c1 始终为 1 或 0,c1 是一个标志,让您知道 c4 col 的当前行已完全或切入下一行(因为 c4 col 的长度超出限制),例如:c4是 nvarchar2(4000),如果 c4 的内容 = 40001 那么最后一个字母将存储在下一行
  • 那么最后一个中断是有 0 而不是 1?
  • @anky 是的,对
  • @anky 或者开头,不需要中断,c1 也 = 0

标签: python sql dataframe apache-spark pyspark


【解决方案1】:

即使您的数据集位于多个分区中且未排序,此解决方案也有效。

from pyspark.sql.window import Window
from pyspark.sql import functions as F
orderByColumns = [F.col('c4'),F.col('c1').cast('int').desc()]
partitionColumns =[ F.col(column) for column in ['c2','c3','c5']]
df_in.orderBy(orderByColumns)\
     .withColumn('ranked',F.dense_rank().over(Window.partitionBy(partitionColumns).orderBy(orderByColumns)))\
     .withColumn('c4-ranked',F.concat(F.col('ranked'),F.lit('='),F.col('c4')))\
     .groupBy(partitionColumns)\
     .agg(F.collect_list('c4-ranked').alias('c4'))\
     .select(
         F.col('c2'),
         F.col('c3'),
         F.regexp_replace(F.array_join(F.col('c4'),"-"),"\d+=","").alias('c4'),
         F.col('c5')
     )\
     .show()

+---+---+-----------+---+
| c2| c3|         c4| c5|
+---+---+-----------+---+
|  1|  2|          3|  4|
|  x|  y|z1-z2-z3-z4|  k|
|  e|  f|          g|  h|
|  6|  7|      81-82|  9|
|  a|  b|      c1-c2|  d|
+---+---+-----------+---+

设置

df_in = sparkSession.createDataFrame(
    [
      (1, 'a', 'b', 'c1', 'd'),
      (0, 'a', 'b', 'c2', 'd'),
      (0, 'e', 'f', 'g', 'h'),
      (0, '1', '2', '3', '4'),
      (1, 'x', 'y', 'z1', 'k'),
      (1, 'x', 'y', 'z2', 'k'),
      (1, 'x', 'y', 'z3', 'k'),
      (0, 'x', 'y', 'z4', 'k'),
      (1, '6', '7', '81', '9'),
      (0, '6', '7', '82', '9'),
    ],
    ['c1', 'c2', 'c3', 'c4', 'c5']
).repartition(5) 

df_in.show()

在我的跑步中提供(可能非常每次跑步)

+---+---+---+---+---+
| c1| c2| c3| c4| c5|
+---+---+---+---+---+
|  1|  x|  y| z2|  k|
|  0|  x|  y| z4|  k|
|  1|  a|  b| c1|  d|
|  0|  1|  2|  3|  4|
|  0|  6|  7| 82|  9|
|  0|  a|  b| c2|  d|
|  0|  e|  f|  g|  h|
|  1|  6|  7| 81|  9|
|  1|  x|  y| z3|  k|
|  1|  x|  y| z1|  k|
+---+---+---+---+---+

【讨论】:

  • 我想知道,一开始真的需要df_in.orderBy(orderByColumns),我不知道这是什么意思
  • 某些情况下没有通过,您的解决方案是通用的,但它不保持顺序条件,例如我更新了问题,请查看上面的内容
  • 我删除了列表 orderBy col 中的 c4,这是可行的,但我必须在原始数据中保持 c4 中的顺序
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2021-12-29
  • 2021-04-01
  • 1970-01-01
  • 1970-01-01
  • 2022-10-14
  • 2018-09-12
相关资源
最近更新 更多