【问题标题】:pyspark dataframe parent child hierarchy issuepyspark数据框父子层次结构问题
【发布时间】:2020-07-10 16:12:42
【问题描述】:

继续问题:pyspark dataframe withColumn command not working

我有一个输入数据框:df_input(更新的 df_input)

|comment|inp_col|inp_val|
|11     |a      |a1     |
|12     |a      |a2     |
|12     |f      |&a     |
|12     |a      |f9     |
|15     |b      |b3     |
|16     |b      |b4     |
|17     |c      |&b     |
|17     |c      |c5     |
|17     |d      |&c     |
|17     |d      |d6     |
|17     |e      |&d     |
|17     |e      |e7     |

如果您看到 inp_col 和 inp_val 具有层次结构,并且它可以是具有根值的 n 数。这里的父值是 "b""a"

现在,根据我的要求,我必须将以 "&" 开头的子值替换为其对应的值。 我尝试迭代以 inp_val 列中的“&”值开头的值列表,并在每次迭代中用值列表替换。 但是,它没有奏效。我面临如何获取包含父子列表值的列表的问题。

试过的代码:

list_1 = [row['inp_val'] for row in tst.select(tst.inp_val).where(tst.inp_val.substr(0, 1) == '&').collect()]
# removing the '&' at every starting of the list values
list_2 = [list_val[1:] for list_val in list_1]
tst_1 = tst.withColumn("val_extract", when(tst.inp_val.substr(0, 1) == '&', regexp(tst.inp_val, "&", "")).otherwise(tst.inp_val))
for val in list_2:
   df_leaf = tst_1.select(tst_1.val_extract).where(tst_1.inp_col == val)
   list_3 = [row['val_extract'] for row in df_leaf.collect()]

   tst_1 = tst_1.withColumn('bool', when(tst_1.val_extract == val, 'True').otherwise('False'))
   tst_1 = tst_1.withColumn('val_extract', when(tst_1.bool == 'True', str(list_3)).otherwise(tst_1.val_extract)).drop('bool')

更新的预期输出:

|comment|inp_col|inp_val|inp_extract                  |
|11     |a      |a1     |['a1']                       |
|12     |a      |a2     |['a2']                       |
|12     |f      |&a     |['a1, 'a2']                  |
|12     |f      |f9     |['f9']                       |
|15     |b      |b3     |['b3']                       |
|16     |b      |b4     |['b4']                       |
|17     |c      |&b     |['b3', 'b4']                 |
|18     |c      |c5     |['c5']                       |
|19     |d      |&c     |['b3', 'b4', 'c5']           |
|20     |d      |d6     |['d6']                       |
|21     |e      |&d     |['b3', 'b4', 'c5', 'd6']     |
|22     |e      |e7     |['e7']                       |

之后我可以尝试做爆炸来获得多行。但是,aove 输出是我们需要的,无法得到一定的百分比结果。

【问题讨论】:

  • 您的案例似乎比仅 1 个自联接更复杂(与上一个问题相比),它可能需要 n 次自联接,具体取决于数据的复杂性。 spark graphframes r 是通往这里的道路,因为它们是真正为 spark 数据帧上的图形操作而构建的 https://docs.databricks.com/spark/latest/graph-analysis/graphframes/user-guide-python.html

标签: python pyspark hierarchical-data pyspark-dataframes tarjans-algorithm


【解决方案1】:

如果您确实想避免使用图表,并且您的案例并不比上面显示的更复杂,请试试这个。

from pyspark.sql import functions as F

df.show() #sampledataframe

#+-------+---------+---------+
#|comment|input_col|input_val|
#+-------+---------+---------+
#|     11|        a|       a1|
#|     12|        a|       a2|
#|     12|        f|       &a|
#|     12|        f|       f9|
#|     15|        b|       b3|
#|     16|        b|       b4|
#|     17|        c|       &b|
#|     17|        c|       c5|
#|     17|        d|       &c|
#|     17|        d|       d6|
#|     17|        e|       &d|
#|     17|        e|       e7|
#+-------+---------+---------+

df1=df.join(df.groupBy("input_col").agg(F.collect_list("input_val").alias("y1"))\
          .withColumnRenamed("input_col","x1"),F.expr("""input_val rlike x1"""),'left')\
  .withColumn("new_col", F.when(F.expr("""substring(input_val,0,1)!""")!=F.lit('&'), F.array("input_val"))\
                    .otherwise(F.col("y1"))).drop("x1","y1")

df2=df1.join(df1.selectExpr("input_val as input_val1","new_col as new_col1"), F.expr("""array_contains(new_col,input_val1) and\
           substring(input_val1,0,1)=='&'"""),'left')


df2.join(df2.selectExpr("input_val1 as val2","new_col1 as col2")\
         .dropna(),F.expr("""array_contains(new_col1,val2)"""),'left')\
  .withColumn("inp_extract", F.when(F.expr("""substring(input_val,0,1)!='&'"""), F.col("new_col"))\
                        .otherwise(F.expr("""filter(concat(\
                        coalesce(new_col,array()),\
                        coalesce(new_col1,array()),\
                        coalesce(col2, array()))\
                        ,x-> x is not null and substring(x,0,1)!='&')""")))\

  .select("comment","input_col","input_val",F.array_sort("inp_extract").alias("inp_extract")).show()

#+-------+---------+---------+----------------+
#|comment|input_col|input_val|     inp_extract|
#+-------+---------+---------+----------------+
#|     11|        a|       a1|            [a1]|
#|     12|        a|       a2|            [a2]|
#|     12|        f|       &a|        [a1, a2]|
#|     12|        f|       f9|            [f9]|
#|     15|        b|       b3|            [b3]|
#|     16|        b|       b4|            [b4]|
#|     17|        c|       &b|        [b3, b4]|
#|     17|        c|       c5|            [c5]|
#|     17|        d|       &c|    [b3, b4, c5]|
#|     17|        d|       d6|            [d6]|
#|     17|        e|       &d|[b3, b4, c5, d6]|
#|     17|        e|       e7|            [e7]|
#+-------+---------+---------+----------------+

【讨论】:

  • 非常感谢@Murtihash。但我无法完全理解最后一个 join 它是如何工作的。你能解释一下你口头上做了什么,这样我就可以去检查每一行是如何进入 join 并逐行获取 inp_extract 的。这真的很有帮助。再次感谢。
  • 谢谢。只需要了解发送和第三次加入是如何工作的。
  • 最后一个连接条件基于 array_contains 函数为真,并且使用与 arraytype 的连接允许我们巩固这些关系。我建议您分解解决方案并打印中间结果,以便充分了解该过程
  • 好的,谢谢。我一点一点地经历了如何分析的完整流程。但是,只有一件事是F.when(F.expr("""substring(input_val,0,1)!""")!=F.lit('&'),为什么@ 987654322@ 被使用???(我的意思是关于!-在 expr 子字符串中??)
【解决方案2】:

您可以将数据框连接到自身以获取此信息。

input : 
df.show()

+-------+-------+---------+
|comment|inp_col|input_val|
+-------+-------+---------+
|     11|      a|       a1|
|     12|      a|       a2|
|     13|      f|       &a|
|     14|      b|       b3|
|     15|      b|       b4|
|     16|      d|       &b|
+-------+-------+---------+

import pyspark.sql.functions as F


df.createOrReplaceTempView("df1")
df.withColumn("input_val", F.regexp_replace(F.col("input_val"), "&", "")).createOrReplaceTempView("df2")

spark.sql("""select * from (select coalesce(df2.comment,df1.comment) as comment , 
coalesce(df2.inp_col,df1.inp_col) as inp_col,
 coalesce(df2.input_val,df2.input_val) as input_val ,
 case when df1.input_val is not null then df1.input_val else df2.input_val end  as output
 from df1  full outer join df2 on df2.input_val = df1.inp_col) where input_val is not null order by comment  """).show()
Output
+-------+-------+---------+------+
|comment|inp_col|input_val|output|
+-------+-------+---------+------+
|     11|      a|       a1|    a1|
|     12|      a|       a2|    a2|
|     13|      f|        a|    a1|
|     13|      f|        a|    a2|
|     14|      b|       b3|    b3|
|     15|      b|       b4|    b4|
|     16|      d|        b|    b3|
|     16|      d|        b|    b4|
+-------+-------+---------+------+

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2011-04-23
    • 2016-07-01
    • 2020-05-29
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多