【问题标题】:union two RDDs Spark scala, keeping the right side联合两个 RDD Spark scala,保持右侧
【发布时间】:2015-10-22 15:15:50
【问题描述】:

我有两个 spark 数据框,结构如下。在使用 sqlContext 之前阅读。

 itens.columns (scala command) 
 Array[String] = Array(id_location,id_item, name, price)

 rdd1 
 [1,1,item A,10]
 [1,2,item b,12]
 [1,3,item c,12]

 rdd2
 [1,2,item b,50]
 [1,4,item c,12]
 [1,5,item c,12]

我想要基于复合键 (id_location,id_item) 的以下结果

 [1,1,item A,10]
 [1,2,item b,50]
 [1,3,item c,12]
 [1,4,item c,12]
 [1,5,item c,12]

所以,我想要一个具有不同项的结果(关于复合键),但是当我在两个 rdds 中找到具有相同键的记录时,我只想保留来自 rdd2 的记录。

有人有这种要求吗?

我正在使用 spark 和 scala。

最好的问候 拉斐尔。

【问题讨论】:

    标签: scala apache-spark apache-spark-sql spark-dataframe


    【解决方案1】:

    我对 Spark 很陌生,所以可能有更好的方法来做到这一点,但是您是否可以映射到一对 RDD(基于您的复合键),然后执行 fullOuterJoin,仅使用“正确”结果数据中的元素,其中“左侧”和“右侧”都有数据?

    粗略的伪代码:

    val pairRdd1 = rdd1 map {
      line => 
        (line(0)+line(1), line)
    }
    
    val pairRdd2 = rdd2 map {
      line => 
        (line(0)+line(1), line)
    }
    
    val joined = pairRdd1.fullOuterJoin(pairRdd2)
    
    joined map {
      (id, left, right) =>
        right.getOrElse(left.get)
    }
    

    如果我早上有时间,我会尝试拼凑一个可行的示例。希望有帮助!

    【讨论】:

    【解决方案2】:

    @Steven 有正确的想法。您需要将数据集映射到键值对,然后执行outerjoin

    val rdd1 = sc.parallelize(List((1,1,"item A",10),(1,2,"item b",12),(1,3,"item c",12)))
    val rdd2 = sc.parallelize(List((1,2,"item b",50),(1,4,"item c",12),(1,5,"item c",12)))
    
    val rdd1KV = rdd1.map{case(id_location,id_item, name, price) => ((id_location, id_item), (name, price))}
    val rdd2KV = rdd2.map{case(id_location,id_item, name, price) => ((id_location, id_item), (name, price))}
    
    val joined = rdd1KV.fullOuterJoin(rdd2KV)
    
    val res = joined.map{case((id_location, id_item),(leftOption, rightOption)) =>
        val values = rightOption.getOrElse(leftOption.get)
        (id_location, id_item, values._1, values._2)
    }
    

    这将为您提供所需的结果。

    【讨论】:

    【解决方案3】:

    看起来@Steven 的答案在逻辑上很好,但如果您的数据没有很多相交元素(即完全外连接将产生巨大的数据集),则可能会遇到问题。您也在使用 DataFrames,因此转换为 RDDs 然后再转换回 DataFrames 对于可以使用 DataFrames API 完成的任务来说似乎是多余的。我将在下面描述如何做到这一点。

    让我们从一些示例数据开始(取自您的示例):

    val rdd1 = sc.parallelize(Array((1,1,"item A",10), (1,2,"item b",12), (1,3,"item c",12)))
    val rdd2 = sc.parallelize(Array((1,2,"item b",50), (1,4,"item c",12), (1,5,"item c",12)))
    

    接下来,我们可以将它们转换为不同列别名下的 DataFrame。我们在df1df2这里使用了不同的别名,因为当我们最终加入这两个DataFrame时,后续的选择可以更容易编写(如果有办法在加入后识别列的来源,这是不必要的) .请注意,两个 DataFrame 的并集包含您要过滤的行。

    val df1 = rdd1.toDF("id_location", "id_item", "name", "price")
    val df2 = rdd2.toDF("id_location_2", "id_item_2", "name_2", "price_2")
    
    // df1.unionAll(df2).show()
    // +-----------+-------+------+-----+
    // |id_location|id_item|  name|price|
    // +-----------+-------+------+-----+
    // |          1|      1|item A|   10|
    // |          1|      2|item b|   12|
    // |          1|      3|item c|   12|
    // |          1|      2|item b|   50|
    // |          1|      4|item c|   12|
    // |          1|      5|item c|   12|
    // +-----------+-------+------+-----+
    

    在这里,我们首先在键上连接两个 DataFrame,即 df1df2 的前两个元素。然后,我们通过选择行(基本上来自df1)创建另一个DataFrame,其中存在来自df2 的行具有相同的连接键。之后,我们在df1 上运行一个except 来删除之前创建的DataFrame 中的所有行。这可以看作是一种补充,因为我们基本上所做的是从df1 中删除所有行,其中df2 中存在相同的("id_location", "id_item")。最后,我们将补码与df2 结合在一起以生成输出数据帧。

    val df_joined = df1.join(df2, (df1("id_location") === df2("id_location_2")) && (df1("id_item") === df2("id_item_2")))
    val df1_common_keyed = df_joined.select($"id_location", $"id_item", $"name", $"price")
    val df1_complement = df1.except(df1_common_keyed)
    val df_union = df1_complement.unionAll(df2)
    
    // df_union.show()
    // +-----------+-------+------+-----+
    // |id_location|id_item|  name|price|
    // +-----------+-------+------+-----+
    // |          1|      3|item c|   12|
    // |          1|      1|item A|   10|
    // |          1|      2|item b|   50|
    // |          1|      4|item c|   12|
    // |          1|      5|item c|   12|
    // +-----------+-------+------+-----+
    

    再次,就像@Steven 建议的那样,您可以通过将 DataFrames 转换为 RDD 并使用它运行来使用 RDD API。如果这是您想要做的,以下是使用subtractByKey() 和上面的输入 RDD 完成您想要的另一种方法:

    val keyed1 = rdd1.keyBy { case (id_location, id_item, _, _) => (id_location, id_item) }
    val keyed2 = rdd2.keyBy { case (id_location, id_item, _, _) => (id_location, id_item) }
    val unionRDD = keyed1.subtractByKey(keyed2).values.union(rdd2)
    
    // unionRDD.collect().foreach(println)
    // (1,1,item A,10)
    // (1,3,item c,12)
    // (1,2,item b,50)
    // (1,4,item c,12)
    // (1,5,item c,12)
    

    【讨论】:

    • 谢谢@Rohan,我也会试试我们的解决方案。
    猜你喜欢
    • 2015-10-18
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-09-07
    • 1970-01-01
    • 2016-08-17
    • 2018-12-31
    • 2015-10-31
    相关资源
    最近更新 更多