【问题标题】:Build a hierarchy from a relational data-set using Pyspark使用 Pyspark 从关系数据集构建层次结构
【发布时间】:2021-01-02 12:36:09
【问题描述】:

我是 Python 新手,一直坚持从关系数据集构建层次结构。
如果有人知道如何进行此操作,那将是非常有帮助的。

我有一个关系数据集,其中包含类似的数据

_currentnode,  childnode_  
 root,         child1  
 child1,       leaf2  
 child1,       child3  
 child1,       leaf4  
 child3,       leaf5  
 child3,       leaf6  

等等。我正在寻找一些 python 或 pyspark 代码来
构建一个如下所示的层次结构数据框

_level1, level2,  level3,  level4_  
root,    child1,  leaf2,   null  
root,    child1,  child3,  leaf5  
root,    child1,  child3,  leaf6  
root,    child1,  leaf4,   null  

数据是字母数字,是一个庞大的数据集 [~5000 万条记录]。
此外,层次结构的根是已知的,并且可以在代码中硬连线。
所以在上面的例子中,层次结构的根是'root'。

【问题讨论】:

    标签: python apache-spark pyspark hierarchy graphframes


    【解决方案1】:

    Pyspark 的最短路径

    输入数据可以解释为currentnodechildnode 之间的连接图。那么问题是根节点和所有叶子节点之间的最短路径是什么,称为single source shortest path

    Spark 有Graphx 来处理图形的并行计算。不幸的是,GraphX 没有提供 Python API(更多细节可以在here 找到)。支持 Python 的图形库是GraphFrames。 GraphFrames 使用了 GraphX 的一部分。

    GraphX 和 GraphFrames 都为 sssp 提供了解决方案。不幸的是,这两种实现都只返回最短路径的长度,而不是路径本身(GraphXGraphFrames)。但是this answer 为 GraphX 和 Scala 提供了算法的实现,它也返回了路径。这三个解决方案都使用Pregel

    将上述答案翻译成 GraphFrames/Python:

    1。数据准备

    为所有节点提供唯一的 ID 并更改列名,以使其符合 here 描述的名称

    import pyspark.sql.functions as F
    
    df = ...
    
    vertices = df.select("currentnode").withColumnRenamed("currentnode", "node").union(df.select("childnode")).distinct().withColumn("id", F.monotonically_increasing_id()).cache()
    
    edges = df.join(vertices, df.currentnode == vertices.node).drop(F.col("node")).withColumnRenamed("id", "src")\
            .join(vertices, df.childnode== vertices.node).drop(F.col("node")).withColumnRenamed("id", "dst").cache() 
    
    Nodes                   Edges
    +------+------------+   +-----------+---------+------------+------------+
    |  node|          id|   |currentnode|childnode|         src|         dst|
    +------+------------+   +-----------+---------+------------+------------+
    | leaf2| 17179869184|   |     child1|    leaf4| 25769803776|249108103168|
    |child1| 25769803776|   |     child1|   child3| 25769803776| 68719476736|
    |child3| 68719476736|   |     child1|    leaf2| 25769803776| 17179869184|
    | leaf6|103079215104|   |     child3|    leaf6| 68719476736|103079215104|
    |  root|171798691840|   |     child3|    leaf5| 68719476736|214748364800|
    | leaf5|214748364800|   |       root|   child1|171798691840| 25769803776|
    | leaf4|249108103168|   +-----------+---------+------------+------------+
    +------+------------+   
    

    2。创建 GraphFrame

    from graphframes import GraphFrame
    graph = GraphFrame(vertices, edges)
    

    3。创建将构成 Pregel 算法的单个部分的 UDF

    消息类型:
    from pyspark.sql.types import *
    vertColSchema = StructType()\
          .add("dist", DoubleType())\
          .add("node", StringType())\
          .add("path", ArrayType(StringType(), True))
    

    顶点程序:

    def vertexProgram(vd, msg):
        if msg == None or vd.__getitem__(0) < msg.__getitem__(0):
            return (vd.__getitem__(0), vd.__getitem__(1), vd.__getitem__(2))
        else:
            return (msg.__getitem__(0), vd.__getitem__(1), msg.__getitem__(2))
    vertexProgramUdf = F.udf(vertexProgram, vertColSchema)
    

    外发消息:

    def sendMsgToDst(src, dst):
        srcDist = src.__getitem__(0)
        dstDist = dst.__getitem__(0)
        if srcDist < (dstDist - 1):
            return (srcDist + 1, src.__getitem__(1), src.__getitem__(2) + [dst.__getitem__(1)])
        else:
            return None
    sendMsgToDstUdf = F.udf(sendMsgToDst, vertColSchema)
    

    消息聚合:

    def aggMsgs(agg):
        shortest_dist = sorted(agg, key=lambda tup: tup[1])[0]
        return (shortest_dist.__getitem__(0), shortest_dist.__getitem__(1), shortest_dist.__getitem__(2))
    aggMsgsUdf = F.udf(aggMsgs, vertColSchema)
    

    4。组合部件

    from graphframes.lib import Pregel
    result = graph.pregel.withVertexColumn(colName = "vertCol", \
        initialExpr = F.when(F.col("node")==(F.lit("root")), F.struct(F.lit(0.0), F.col("node"), F.array(F.col("node")))) \
        .otherwise(F.struct(F.lit(float("inf")), F.col("node"), F.array(F.lit("")))).cast(vertColSchema), \
        updateAfterAggMsgsExpr = vertexProgramUdf(F.col("vertCol"), Pregel.msg())) \
        .sendMsgToDst(sendMsgToDstUdf(F.col("src.vertCol"), Pregel.dst("vertCol"))) \
        .aggMsgs(aggMsgsUdf(F.collect_list(Pregel.msg()))) \
        .setMaxIter(10) \
        .setCheckpointInterval(2) \
        .run()
    result.select("vertCol.path").show(truncate=False)   
    

    备注:

    • maxIter 应设置为至少与最长路径一样大的值。如果该值较高,则结果将保持不变,但计算时间会变长。如果该值太小,则结果中将缺少较长的路径。当前版本的 GraphFrames (0.8.0) 不支持在不再发送新消息时停止循环。
    • checkpointInterval 应设置为小于maxIter 的值。实际值取决于数据和可用硬件。当发生 OutOfMemory 异常或 Spark 会话挂起一段时间时,该值可能会减小。

    最终结果是一个带有内容的常规数据框

    +-----------------------------+
    |path                         |
    +-----------------------------+
    |[root, child1]               |
    |[root, child1, leaf4]        |
    |[root, child1, child3]       |
    |[root]                       |
    |[root, child1, child3, leaf6]|
    |[root, child1, child3, leaf5]|
    |[root, child1, leaf2]        |
    +-----------------------------+
    

    如果需要,可以在这里过滤掉非叶子节点。

    【讨论】:

    • 非常简洁干净的实现。只是一个疑问——这个检查点间隔在这个算法中是如何工作的?
    • 检查点参数控制创建检查点的频率。此处检查点的作用是减少了所需的内存量但增加了 IO。
    • @werner 我也使用它来获得层次结构,但是我想为每个节点获取子列表的路径而不是路径,我应该如何修改消息函数,有什么建议吗?
    • @Matioski 你的输出到底是什么?我是否正确理解您正在寻找从某个节点开始的所有路径的下一个元素(例如 root -> [child1]、child1 -> [leaf4、child3、leaf2])?
    • @werner 是的,我有一个连接的有向树,对于每个节点,我们想要获取可访问节点的列表(例如 child1[leaf4,child3,leaf2,leaf6,leaf5], child3[叶6,叶5])。目前我在运行上述预凝胶后所做的如下:
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2014-12-08
    • 2018-08-26
    • 2011-10-15
    • 2019-12-28
    • 1970-01-01
    相关资源
    最近更新 更多