【问题标题】:Merge two dataframes in PySpark在 PySpark 中合并两个数据框
【发布时间】:2020-04-29 13:46:42
【问题描述】:

我有两个数据帧,DF1 和 DF2,DF1 是存储来自 DF2 的任何附加信息的主数据。

假设 DF1 的格式如下,

Item Id | item      | count
---------------------------
1       | item 1    | 2
2       | item 2    | 3
1       | item 3    | 2
3       | item 4    | 5

DF2 包含 DF1 中已经存在的 2 个项目和两个新条目。 (itemId和item被认为是一个单独的组,可以作为join的key)

Item Id | item      | count
---------------------------
1       | item 1    | 2
3       | item 4    | 2
4       | item 4    | 4
5       | item 5    | 2

我需要合并两个数据框,以便增加现有项目计数并插入新项目。

结果应该是这样的:

Item Id | item      | count
---------------------------
1       | item 1    | 4
2       | item 2    | 3
1       | item 3    | 2
3       | item 4    | 7
4       | item 4    | 4
5       | item 5    | 2

我有一种方法可以做到这一点,不确定它是否有效或正确的方法

temp1 = df1.join(temp,['item_id','item'],'full_outer') \
    .na.fill(0)

temp1\
    .groupby("item_id", "item")\
    .agg(F.sum(temp1["count"] + temp1["newcount"]))\
    .show()

【问题讨论】:

  • 所以您的“项目 ID”不是唯一的?
  • 是的,它不是唯一的
  • @Ramesh Maharjan,我认为你们俩都提出了相同的答案。你的回答没有错。
  • 我知道它们是一样的。这就是为什么我已经问了你几千次了。顺便说一句,我先回答了,但因为你忽略了我。我刚刚删除了我的帖子。谢谢:)

标签: python apache-spark pyspark pyspark-sql


【解决方案1】:

由于两个数据帧的架构相同,您可以执行 union,然后执行 groupby id 和 aggregate 计数。

step1: df3 = df1.union(df2);
step2: df3.groupBy("Item Id", "item").agg(sum("count").as("count"));

【讨论】:

  • 应该如何,如果一次没有。的列不同?我认为,union 仅在两者具有相同编号时才有效。列数。
【解决方案2】:

有几种方法可以做到这一点。

根据您的描述,最直接的解决方案是使用 RDD - SparkContext.union

rdd1 = sc.parallelize(DF1)
rdd2 = sc.parallelize(DF2)

union_rdd = sc.union([rdd1, rdd2])

替代解决方案是使用来自pyspark.sqlDataFrame.union

注意:我之前建议过unionAll,但它在 Spark 2.0 中已被弃用

【讨论】:

  • 联合只是合并数据框或rdd。我想合并数据。如果您在结果数据集中看到,它将更新以下项目。 (1 | 项目 1 | 4) (3 | 项目 4 | 7)
【解决方案3】:

推荐@wandermonk 的解决方案,因为它不使用连接。尽可能避免连接,因为这会触发洗牌(也称为宽转换,会导致通过网络传输数据,而且成本高且速度慢)

您还必须查看您的数据大小(两个表都很大或一个小一个大等),因此您可以调整它的性能方面。

我尝试通过使用 SparkSQL 的解决方案来展示该组,因为他们做同样的事情,但更容易理解和操作。

from pyspark.sql.types import StructType, StructField, IntegerType, StringType

list_1 = [[1,"item 1" , 2],[2 ,"item 2", 3],[1 ,"item 3" ,2],[3 ,"item 4" , 5]]
list_2 = [[1,"item 1",2],[3 ,"item 4",2],[4 ,"item 4",4],[5 ,"item 5",2]]

my_schema = StructType([StructField("Item_ID",IntegerType(), True),StructField("Item_Name",StringType(), True ),StructField("Quantity",IntegerType(), True)])
df1 = spark.createDataFrame(list_1, my_schema)
df2 = spark.createDataFrame(list_2, my_schema)

df1.createOrReplaceTempView("df1")
df1.createOrReplaceTempView("df2")

df3 = df2.union(df1)
df3.createOrReplaceTempView("df3")
df4 = spark.sql("select Item_ID, Item_Name, sum(Quantity) as Quantity from df3 group by Item_ID, Item_Name")
df4.show(10)

现在,如果您查看 SparkUI,您可以看到如此小的数据集、随机播放操作和阶段数。

这么小的工作的阶段数

通过命令为该组的 shuffle 操作编号

我还建议查看 SQL 计划并了解成本。 Exchange 代表了这里的洗牌。

== Physical Plan ==
*(2) HashAggregate(keys=[Item_ID#6, Item_Name#7], functions=[sum(cast(Quantity#8 as bigint))], output=[Item_ID#6, Item_Name#7, Quantity#32L])
+- Exchange hashpartitioning(Item_ID#6, Item_Name#7, 200)
   +- *(1) HashAggregate(keys=[Item_ID#6, Item_Name#7], functions=[partial_sum(cast(Quantity#8 as bigint))], output=[Item_ID#6, Item_Name#7, sum#38L])
      +- Union
         :- Scan ExistingRDD[Item_ID#6,Item_Name#7,Quantity#8]
         +- Scan ExistingRDD[Item_ID#0,Item_Name#1,Quantity#2]

【讨论】:

    猜你喜欢
    • 2021-11-17
    • 1970-01-01
    • 2022-01-18
    • 1970-01-01
    • 1970-01-01
    • 2022-01-12
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多