【问题标题】:Randomly join two dataframes随机加入两个数据帧
【发布时间】:2017-09-23 11:07:06
【问题描述】:

我有两张表,一张名为 Reasons 有 9 条记录,另一张包含 40k 条记录的 ID。

ID:

+------+------+
|pc_pid|pc_aid|
+------+------+
|  4569|  1101|
| 63961|  1101|
|140677|  4364|
|127113|     7|
| 96097|   480|
|  8309|  3129|
| 45218|    89|
|147036|  3289|
| 88493|  3669|
| 29973|  3129|
|127444|  3129|
| 36095|    89|
|131001|  1634|
|104731|   781|
| 79219|   244|
+-------------+

原因:

+-----------------+
|          reasons|
+-----------------+
|        follow up|
|         skin chk|
|      annual meet|
|review lab result|
|        REF BY DR|
|       sick visit|
|        body pain|
|             test|
|            other|
+-----------------+

我想要这样的输出

|pc_pid|pc_aid| reason 
+------+------+-------------------
|  4569|  1101| body pain
| 63961|  1101| review lab result
|140677|  4364| body pain
|127113|     7| sick visit
| 96097|   480| test
|  8309|  3129| other
| 45218|    89| follow up
|147036|  3289| annual meet
| 88493|  3669| review lab result
| 29973|  3129| REF BY DR
|127444|  3129| skin chk
| 36095|    89|  other

由于我只有 9 条记录,而在 ID 数据框中我有 40k 条记录,我想为每个 id 随机分配原因。

【问题讨论】:

    标签: scala apache-spark random dataframe


    【解决方案1】:

    以下解决方案试图对原因的数量更加稳健(即,您可以有尽可能多的原因,因为您可以合理地适应您的集群)。如果您只有几个原因(就像 OP 要求的那样),您可能可以广播它们或将它们嵌入到 udf 中并轻松解决此问题。


    一般的想法是为原因创建一个索引(顺序),然​​后在 IDs 数据集上创建从 0 到 N(其中 N 是原因的数量)的随机值,然后使用这两个新列连接两个表。您可以这样做:

    case class Reasons(s: String)
    defined class Reasons
    
    case class Data(id: Long)
    defined class Data
    

    Data 将保存 ID(OP 的简化版本), Reasons 将保存一些简化的原因。

    val d1 = spark.createDataFrame( Data(1) :: Data(2) :: Data(10) :: Nil)
    d1: org.apache.spark.sql.DataFrame = [id: bigint]
    
    d1.show()
    
    +---+
    | id|
    +---+
    |  1|
    |  2|
    | 10|
    +---+
    
    val d2 = spark.createDataFrame( Reasons("a") :: Reasons("b") :: Reasons("c") :: Nil)
    
    +---+
    |  s|
    +---+
    |  a|
    |  b|
    |  c|
    +---+
    

    我们稍后将需要原因的数量,因此我们首先计算它。

    val numerOfReasons = d2.count()
    
    val d2Indexed = spark.createDataFrame(d2.rdd.map(_.getString(0)).zipWithIndex)
    
    d2Indexed.show()
    +---+---+
    | _1| _2|
    +---+---+
    |  a|  0|
    |  b|  1|
    |  c|  2|
    +---+---+
    
    val d1WithRand = d1.select($"id", (rand * numerOfReasons).cast("int").as("rnd"))
    

    最后一步是加入新列并删除它们。

    val res = d1WithRand.join(d2Indexed, d1WithRand("rnd") === d2Indexed("_2")).drop("_2").drop("rnd")
    
    res.show()
    
    +---+---+
    | id| _1|
    +---+---+
    |  2|  a|
    | 10|  b|
    |  1|  c|
    +---+---+
    

    【讨论】:

      【解决方案2】:

      pyspark 随机加入自身

      data_neg = data_pos.sortBy(lambda x: uniform(1, 10000))
      data_neg = data_neg.coalesce(1, False).zip(data_pos.coalesce(1, True))
      

      【讨论】:

        【解决方案3】:

        随机加入dataA(大数据框)和dataB(小数据框,按任意列排序)的最快方法:

        dfB = dataB.withColumn(
            "index", F.row_number().over(Window.orderBy("col")) - 1
        )
        dfA = dataA.withColumn("index", (F.rand() * dfB.count()).cast("bigint"))
        df = dfA.join(dfB, on="index", how="left").drop("index")
        

        由于 dataB 已经排序,行号可以在排序后的窗口上分配,具有高度的并行性。 F.rand() 是另一个高度并行的函数,因此向 dataA 添加索引也会非常快。

        如果 dataB 足够小,您可能会从广播中受益。

        这种方法比使用更好:

        • zipWithIndex:将数据帧转换为 rdd、zipWithIndex 和 df 可能非常昂贵。
        • monotonically_increasing_id:需要与 row_number 一起使用,它将所有分区收集到单个执行程序中。

        参考:https://towardsdatascience.com/adding-sequential-ids-to-a-spark-dataframe-fa0df5566ff6

        【讨论】:

          猜你喜欢
          • 2021-06-06
          • 1970-01-01
          • 1970-01-01
          • 2022-07-07
          • 2022-01-22
          • 1970-01-01
          • 1970-01-01
          • 2012-07-23
          • 2020-03-27
          相关资源
          最近更新 更多