【问题标题】:Concatenate two PySpark dataframes连接两个 PySpark 数据帧
【发布时间】:2016-09-16 21:03:41
【问题描述】:

我正在尝试将两个 PySpark 数据帧与仅在其中一个上的一些列连接起来:

from pyspark.sql.functions import randn, rand

df_1 = sqlContext.range(0, 10)

+--+
|id|
+--+
| 0|
| 1|
| 2|
| 3|
| 4|
| 5|
| 6|
| 7|
| 8|
| 9|
+--+

df_2 = sqlContext.range(11, 20)

+--+
|id|
+--+
| 10|
| 11|
| 12|
| 13|
| 14|
| 15|
| 16|
| 17|
| 18|
| 19|
+--+

df_1 = df_1.select("id", rand(seed=10).alias("uniform"), randn(seed=27).alias("normal"))
df_2 = df_2.select("id", rand(seed=10).alias("uniform"), randn(seed=27).alias("normal_2"))

现在我想生成第三个数据帧。我想要熊猫concat

df_1.show()
+---+--------------------+--------------------+
| id|             uniform|              normal|
+---+--------------------+--------------------+
|  0|  0.8122802274304282|  1.2423430583597714|
|  1|  0.8642043127063618|  0.3900018344856156|
|  2|  0.8292577771850476|  1.8077401259195247|
|  3|   0.198558705368724| -0.4270585782850261|
|  4|0.012661361966674889|   0.702634599720141|
|  5|  0.8535692890157796|-0.42355804115129153|
|  6|  0.3723296190171911|  1.3789648582622995|
|  7|  0.9529794127670571| 0.16238718777444605|
|  8|  0.9746632635918108| 0.02448061333761742|
|  9|   0.513622008243935|  0.7626741803250845|
+---+--------------------+--------------------+

df_2.show()
+---+--------------------+--------------------+
| id|             uniform|            normal_2|
+---+--------------------+--------------------+
| 11|  0.3221262660507942|  1.0269298899109824|
| 12|  0.4030672316912547|   1.285648175568798|
| 13|  0.9690555459609131|-0.22986601831364423|
| 14|0.011913836266515876|  -0.678915153834693|
| 15|  0.9359607054250594|-0.16557488664743034|
| 16| 0.45680471157575453| -0.3885563551710555|
| 17|  0.6411908952297819|  0.9161177183227823|
| 18|  0.5669232696934479|  0.7270125277020573|
| 19|   0.513622008243935|  0.7626741803250845|
+---+--------------------+--------------------+

#do some concatenation here, how?

df_concat.show()

| id|             uniform|              normal| normal_2   |
+---+--------------------+--------------------+------------+
|  0|  0.8122802274304282|  1.2423430583597714| None       |
|  1|  0.8642043127063618|  0.3900018344856156| None       |
|  2|  0.8292577771850476|  1.8077401259195247| None       |
|  3|   0.198558705368724| -0.4270585782850261| None       |
|  4|0.012661361966674889|   0.702634599720141| None       |
|  5|  0.8535692890157796|-0.42355804115129153| None       |
|  6|  0.3723296190171911|  1.3789648582622995| None       |
|  7|  0.9529794127670571| 0.16238718777444605| None       |
|  8|  0.9746632635918108| 0.02448061333761742| None       |
|  9|   0.513622008243935|  0.7626741803250845| None       |
| 11|  0.3221262660507942|  None              | 0.123      |
| 12|  0.4030672316912547|  None              |0.12323     |
| 13|  0.9690555459609131|  None              |0.123       |
| 14|0.011913836266515876|  None              |0.18923     |
| 15|  0.9359607054250594|  None              |0.99123     |
| 16| 0.45680471157575453|  None              |0.123       |
| 17|  0.6411908952297819|  None              |1.123       |
| 18|  0.5669232696934479|  None              |0.10023     |
| 19|   0.513622008243935|  None              |0.916332123 |
+---+--------------------+--------------------+------------+

这可能吗?

【问题讨论】:

    标签: python apache-spark pyspark apache-spark-sql


    【解决方案1】:

    也许您可以尝试创建不存在的列并调用unionunionAll 用于 Spark 1.6 或更低版本):

    from pyspark.sql.functions import lit
    
    cols = ['id', 'uniform', 'normal', 'normal_2']    
    
    df_1_new = df_1.withColumn("normal_2", lit(None)).select(cols)
    df_2_new = df_2.withColumn("normal", lit(None)).select(cols)
    
    result = df_1_new.union(df_2_new)
    

    【讨论】:

    • unionAll() 在 spark 2.0 中已弃用。改用union()
    • 你可以使用withColumnRenamedspark.apache.org/docs/latest/api/python/…重命名
    • 在进行联合时删除重复行的任何方法@Daniel
    • 不错的解决方案!
    • @javan.rajpopat 您可以使用result.dropDuplicates() 删除重复项。
    【解决方案2】:
    df_concat = df_1.union(df_2)
    

    数据框可能需要具有相同的列,在这种情况下,您可以使用withColumn() 创建normal_1normal_2

    【讨论】:

    • 谢谢。正如我上面所说的,问题在于两个数据帧之间的列不相同。
    • 将额外的列添加到数据框中。 df.withColumn('NewColumnName',lit('RequiredValue')
    • 在联合 @David 时删除重复行的任何方法
    • 使用拖放复制功能
    • 两个数据框中的列顺序也应该相同。否则它会给出奇怪的结果。来自文档Also as standard in SQL, this function resolves columns by position (not by name).
    【解决方案3】:

    你可以使用 unionByName 来做这个:

    df = df_1.unionByName(df_2)
    

    unionByName 从 Spark 2.3.0 开始可用。

    【讨论】:

    • 遇到异常:pyspark.sql.utils.AnalysisException: 'Cannot resolve column name "normal" among (id, uniform, normal_2);'
    • 这是因为两个数据框需要相同的列名。
    • 现在我可以避免在附加数据帧之前使用select 重新排序列。谢谢!
    【解决方案4】:

    unionByName 是 spark 中的一个内置选项,可从 spark 2.3.0 获得。

    对于 spark 版本 3.1.0,有默认值设置为 False 的 allowMissingColumns 选项来处理丢失的列。即使两个数据帧没有相同的列集,此函数也可以工作,在结果数据帧中将缺失的列值设置为 null。

    df_1.unionByName(df_2, allowMissingColumns=True).show()
    
    +---+--------------------+--------------------+--------------------+
    | id|             uniform|              normal|            normal_2|
    +---+--------------------+--------------------+--------------------+
    |  0|  0.8122802274304282|  1.2423430583597714|                null|
    |  1|  0.8642043127063618|  0.3900018344856156|                null|
    |  2|  0.8292577771850476|  1.8077401259195247|                null|
    |  3|   0.198558705368724| -0.4270585782850261|                null|
    |  4|0.012661361966674889|   0.702634599720141|                null|
    |  5|  0.8535692890157796|-0.42355804115129153|                null|
    |  6|  0.3723296190171911|  1.3789648582622995|                null|
    |  7|  0.9529794127670571| 0.16238718777444605|                null|
    |  8|  0.9746632635918108| 0.02448061333761742|                null|
    |  9|   0.513622008243935|  0.7626741803250845|                null|
    | 11|  0.3221262660507942|                null|  1.0269298899109824|
    | 12|  0.4030672316912547|                null|   1.285648175568798|
    | 13|  0.9690555459609131|                null|-0.22986601831364423|
    | 14|0.011913836266515876|                null|  -0.678915153834693|
    | 15|  0.9359607054250594|                null|-0.16557488664743034|
    | 16| 0.45680471157575453|                null| -0.3885563551710555|
    | 17|  0.6411908952297819|                null|  0.9161177183227823|
    | 18|  0.5669232696934479|                null|  0.7270125277020573|
    | 19|   0.513622008243935|                null|  0.7626741803250845|
    +---+--------------------+--------------------+--------------------+
    

    【讨论】:

      【解决方案5】:

      为了更通用地将两列保留在df1df2

      import pyspark.sql.functions as F
      
      # Keep all columns in either df1 or df2
      def outter_union(df1, df2):
      
          # Add missing columns to df1
          left_df = df1
          for column in set(df2.columns) - set(df1.columns):
              left_df = left_df.withColumn(column, F.lit(None))
      
          # Add missing columns to df2
          right_df = df2
          for column in set(df1.columns) - set(df2.columns):
              right_df = right_df.withColumn(column, F.lit(None))
      
          # Make sure columns are ordered the same
          return left_df.union(right_df.select(left_df.columns))
      

      【讨论】:

        【解决方案6】:

        这是一种方法,以防它仍然有用:我在 pyspark shell 中运行它,Python 版本 2.7.12,我的 Spark 安装版本是 2.0.1。

        PS:我猜你的意思是为 df_1 df_2 使用不同的种子,下面的代码反映了这一点。

        from pyspark.sql.types import FloatType
        from pyspark.sql.functions import randn, rand
        import pyspark.sql.functions as F
        
        df_1 = sqlContext.range(0, 10)
        df_2 = sqlContext.range(11, 20)
        df_1 = df_1.select("id", rand(seed=10).alias("uniform"), randn(seed=27).alias("normal"))
        df_2 = df_2.select("id", rand(seed=11).alias("uniform"), randn(seed=28).alias("normal_2"))
        
        def get_uniform(df1_uniform, df2_uniform):
            if df1_uniform:
                return df1_uniform
            if df2_uniform:
                return df2_uniform
        
        u_get_uniform = F.udf(get_uniform, FloatType())
        
        df_3 = df_1.join(df_2, on = "id", how = 'outer').select("id", u_get_uniform(df_1["uniform"], df_2["uniform"]).alias("uniform"), "normal", "normal_2").orderBy(F.col("id"))
        

        这是我得到的输出:

        df_1.show()
        +---+-------------------+--------------------+
        | id|            uniform|              normal|
        +---+-------------------+--------------------+
        |  0|0.41371264720975787|  0.5888539012978773|
        |  1| 0.7311719281896606|  0.8645537008427937|
        |  2| 0.1982919638208397| 0.06157382353970104|
        |  3|0.12714181165849525|  0.3623040918178586|
        |  4| 0.7604318153406678|-0.49575204523675975|
        |  5|0.12030715258495939|  1.0854146699817222|
        |  6|0.12131363910425985| -0.5284523629183004|
        |  7|0.44292918521277047| -0.4798519469521663|
        |  8| 0.8898784253886249| -0.8820294772950535|
        |  9|0.03650707717266999| -2.1591956435415334|
        +---+-------------------+--------------------+
        
        df_2.show()
        +---+-------------------+--------------------+
        | id|            uniform|            normal_2|
        +---+-------------------+--------------------+
        | 11| 0.1982919638208397| 0.06157382353970104|
        | 12|0.12714181165849525|  0.3623040918178586|
        | 13|0.12030715258495939|  1.0854146699817222|
        | 14|0.12131363910425985| -0.5284523629183004|
        | 15|0.44292918521277047| -0.4798519469521663|
        | 16| 0.8898784253886249| -0.8820294772950535|
        | 17| 0.2731073068483362|-0.15116027592854422|
        | 18| 0.7784518091224375| -0.3785563841011868|
        | 19|0.43776394586845413| 0.47700719174464357|
        +---+-------------------+--------------------+
        
        df_3.show()
        +---+-----------+--------------------+--------------------+                     
        | id|    uniform|              normal|            normal_2|
        +---+-----------+--------------------+--------------------+
        |  0| 0.41371265|  0.5888539012978773|                null|
        |  1|  0.7311719|  0.8645537008427937|                null|
        |  2| 0.19829196| 0.06157382353970104|                null|
        |  3| 0.12714182|  0.3623040918178586|                null|
        |  4|  0.7604318|-0.49575204523675975|                null|
        |  5|0.120307155|  1.0854146699817222|                null|
        |  6| 0.12131364| -0.5284523629183004|                null|
        |  7| 0.44292918| -0.4798519469521663|                null|
        |  8| 0.88987845| -0.8820294772950535|                null|
        |  9|0.036507078| -2.1591956435415334|                null|
        | 11| 0.19829196|                null| 0.06157382353970104|
        | 12| 0.12714182|                null|  0.3623040918178586|
        | 13|0.120307155|                null|  1.0854146699817222|
        | 14| 0.12131364|                null| -0.5284523629183004|
        | 15| 0.44292918|                null| -0.4798519469521663|
        | 16| 0.88987845|                null| -0.8820294772950535|
        | 17| 0.27310732|                null|-0.15116027592854422|
        | 18|  0.7784518|                null| -0.3785563841011868|
        | 19| 0.43776396|                null| 0.47700719174464357|
        +---+-----------+--------------------+--------------------+
        

        【讨论】:

        • 一种无效的诱人方法是使用pyspark.sql.functions.monotonically_increasing_id()) 向每个df 添加一个索引col,然后对该列进行连接。 monotonically_increasing_id 不保证从 0 开始,也不保证使用连续的整数。
        【解决方案7】:

        将多个 pyspark 数据帧合并为一个:

        from functools import reduce
        
        reduce(lambda x,y:x.union(y), [df_1,df_2])
        

        您可以将 [df_1, df_2] 列表替换为任意长度的列表。

        【讨论】:

          【解决方案8】:

          以上答案非常优雅。我很久以前就写过这个函数,我也在努力将两个具有不同列的数据帧连接起来。

          假设你有数据框 sdf1 和 sdf2

          from pyspark.sql import functions as F
          from pyspark.sql.types import *
          
          def unequal_union_sdf(sdf1, sdf2):
              s_df1_schema = set((x.name, x.dataType) for x in sdf1.schema)
              s_df2_schema = set((x.name, x.dataType) for x in sdf2.schema)
          
              for i,j in s_df2_schema.difference(s_df1_schema):
                  sdf1 = sdf1.withColumn(i,F.lit(None).cast(j))
          
              for i,j in s_df1_schema.difference(s_df2_schema):
                  sdf2 = sdf2.withColumn(i,F.lit(None).cast(j))
          
              common_schema_colnames = sdf1.columns
              sdk = \
                  sdf1.select(common_schema_colnames).union(sdf2.select(common_schema_colnames))
              return sdk 
          
          sdf_concat = unequal_union_sdf(sdf1, sdf2) 
          

          【讨论】:

            【解决方案9】:

            这应该为你做......

            from pyspark.sql.types import FloatType
            from pyspark.sql.functions import randn, rand, lit, coalesce, col
            import pyspark.sql.functions as F
            
            df_1 = sqlContext.range(0, 6)
            df_2 = sqlContext.range(3, 10)
            df_1 = df_1.select("id", lit("old").alias("source"))
            df_2 = df_2.select("id")
            
            df_1.show()
            df_2.show()
            df_3 = df_1.alias("df_1").join(df_2.alias("df_2"), df_1.id == df_2.id, "outer")\
              .select(\
                [coalesce(df_1.id, df_2.id).alias("id")] +\
                [col("df_1." + c) for c in df_1.columns if c != "id"])\
              .sort("id")
            df_3.show()
            

            【讨论】:

              【解决方案10】:

              我试图在 pyspark 中实现 pandas 附加功能,并且我创建了一个自定义函数,我们可以在其中连接 2 个或更多数据帧,即使它们的编号不同。列的唯一条件是如果数据框具有相同的名称,那么它们的数据类型应该相同/匹配。

              我编写了一个自定义函数来合并 2 个数据帧。

              def append_dfs(df1,df2):
                  list1 = df1.columns
                  list2 = df2.columns
                  for col in list2:
                      if(col not in list1):
                          df1 = df1.withColumn(col, F.lit(None))
                  for col in list1:
                      if(col not in list2):
                          df2 = df2.withColumn(col, F.lit(None))
                  return df1.unionByName(df2)
              

              用法:

              1. 连接 2 个数据帧

                final_df = append_dfs(df1,df2)

                1. 连接超过 2(say3) 个数据帧

                final_df = append_dfs(append_dfs(df1,df2),df3)

              示例:

              df1:

              df2:

              结果=append_dfs(df1,df2)

              结果:

              希望这会有用。

              【讨论】:

                【解决方案11】:

                我会这样解决这个问题:

                from pyspark.sql import SparkSession
                df_1.createOrReplaceTempView("tab_1")
                df_2.createOrReplaceTempView("tab_2")
                df_concat=spark.sql("select tab_1.id,tab_1.uniform,tab_1.normal,tab_2.normal_2  from tab_1 tab_1 left join tab_2 tab_2 on tab_1.uniform=tab_2.uniform\
                                union\
                                select tab_2.id,tab_2.uniform,tab_1.normal,tab_2.normal_2  from tab_2 tab_2 left join tab_1 tab_1 on tab_1.uniform=tab_2.uniform")
                df_concat.show()
                

                【讨论】:

                  【解决方案12】:

                  也许,您想连接更多的两个数据框。 我发现了一个使用 pandas Dataframe 转换的问题。

                  假设您有 3 个想要连接的 spark Dataframe。

                  代码如下:

                  list_dfs = []
                  list_dfs_ = []
                  
                  df = spark.read.json('path_to_your_jsonfile.json',multiLine = True)
                  df2 = spark.read.json('path_to_your_jsonfile2.json',multiLine = True)
                  df3 = spark.read.json('path_to_your_jsonfile3.json',multiLine = True)
                  
                  list_dfs.extend([df,df2,df3])
                  
                  for df in list_dfs : 
                  
                      df = df.select([column for column in df.columns]).toPandas()
                      list_dfs_.append(df)
                  
                  list_dfs.clear()
                  
                  df_ = sqlContext.createDataFrame(pd.concat(list_dfs_))
                  

                  【讨论】:

                    猜你喜欢
                    • 2017-11-02
                    • 2016-10-30
                    • 1970-01-01
                    • 2020-02-13
                    • 1970-01-01
                    • 2020-09-02
                    • 1970-01-01
                    • 1970-01-01
                    • 2022-08-14
                    相关资源
                    最近更新 更多