【发布时间】:2021-07-12 04:43:05
【问题描述】:
我需要解开数据框 pyspark 中的一列有条件的序列号。例如
输入数据帧
期望输出数据帧
你可以看到当c1 = 1在一行中,该行将将c4列的内容分成新行(因为长度超过限制)。否则如果当 c1 = 0 时 c4 包含完整内容,则无需换行。 c4列可以将其分成多行下一个
pyspark 中的 pyspark.sql.functions.explode(col) 相同,我需要取消爆炸,但我有一个条件是 c1 列(这并不简单,例如 group by then collect list df.groupby().agg(F.collect_list()),因为c1是有条件的)
我尝试通过这个主题PySpark - Append previous and next row to current row使用窗口函数流。但是当 c4 col 下一个中断多行时我该如何解决
示例代码
from pyspark.sql import SparkSession
spark_session = SparkSession.builder.getOrCreate()
df_in = spark_session.createDataFrame(
[
(1, 'a', 'b', 'c1', 'd'),
(0, 'a', 'b', 'c2', 'd'),
(0, 'e', 'f', 'g', 'h'),
(0, '1', '2', '3', '4'),
(1, 'x', 'y', 'z1', 'k'),
(1, 'x', 'y', 'z2', 'k'),
(1, 'x', 'y', 'z3', 'k'),
(0, 'x', 'y', 'z4', 'k'),
(1, '6', '7', '81', '9'),
(0, '6', '7', '82', '9'),
],
['c1', 'c2', 'c3', 'c4', 'c5']
)
df_out = spark_session.createDataFrame(
[
('a', 'b', 'c1-c2', 'd'),
('e', 'f', 'g', 'h'),
('1', '2', '3', '4'),
('x', 'y', 'z1-z2-z3-z4', 'k'),
('6', '7', '81-82', '9')
],
['c2', 'c3', 'c4', 'c5']
)
df_in.show()
df_out.show()
我该如何解决。谢谢
更新 输入
df_in = spark_session.createDataFrame(
[
('0', 1, 'a', 'b', 'c1', 'd'),
('0', 0, 'a', 'b', 'c2', 'd'),
('0', 0, 'e', 'f', 'g', 'h'),
('0', 0, '1', '2', '3', '4'),
('0', 1, 'x', 'y', 'sele', 'k'),
('0', 1, 'x', 'y', 'ct ', 'k'),
('0', 1, 'x', 'y', 'from', 'k'),
('0', 0, 'x', 'y', 'a', 'k'),
('0', 1, '6', '7', '81', '9'),
('0', 0, '6', '7', '82', '9'),
],
['c0', 'c1', 'c2', 'c3', 'c4', 'c5']
)
输出
期待输出
x| y|从-a中选择| k
【问题讨论】:
-
我不太明白
c1这个专栏的效果。例如,如果 x-y- 行包含0而不是c1中的1s,那么预期的输出将如何变化? -
@werner c1 始终为 1 或 0,c1 是一个标志,让您知道 c4 col 的当前行已完全或切入下一行(因为 c4 col 的长度超出限制),例如:c4是 nvarchar2(4000),如果 c4 的内容 = 40001 那么最后一个字母将存储在下一行
-
那么最后一个中断是有 0 而不是 1?
-
@anky 是的,对
-
@anky 或者开头,不需要中断,c1 也 = 0
标签: python sql dataframe apache-spark pyspark