【问题标题】:Create a generic function to join multiple datasets in pyspark创建一个通用函数以在 pyspark 中连接多个数据集
【发布时间】:2022-11-28 11:30:06
【问题描述】:

您好,我正在创建一个通用函数或类来添加 n 个数据集,但我找不到合适的逻辑来执行此操作,我将所有代码放在下面并突出显示我需要帮助的部分。如果您在理解我的代码时发现任何问题,请联系我。

import pyspark
  
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
  
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()

data_fact = [["1", "sravan", "company 1","100"],
        ["2", "ojaswi", "company 1","200"], 
        ["3", "rohith", "company 2","300"],
        ["4", "sridevi", "company 1","400"], 
        ["5", "bobby", "company 1","500"]]
  
# specify column names
columns = ['ID', 'NAME', 'Company','Amount']
  
# creating a dataframe from the lists of data
df_fact = spark.createDataFrame(data_fact, columns)

Department_table = [["1", "45000", "IT"],
         ["2", "145000", "Manager"],
         ["6", "45000", "HR"],
         ["5", "34000", "Sales"]]
  
# specify column names
columns1 = ['ID', 'salary', 'department']
df_Department = spark.createDataFrame(Department_table, columns1)

Leave_Table = [["1", "Sick Leave"],
         ["2", "Casual leave"],
         ["3", "Casual leave"],
         ["4", "Earned Leave"],
         ["4", "Sick Leave"] ]
  
# specify column names
columns2 = ['ID', 'Leave_type']
df_Leave = spark.createDataFrame(Leave_Table, columns2)

Phone_Table = [["1", "Apple"],
         ["2", "Samsung"],
         ["3", "MI"],
         ["4", "Vivo"],
         ["4", "Apple"] ]
  
# specify column names
columns3 = ['ID', 'Phone_type']
 
df_Phone = spark.createDataFrame(Phone_Table, columns3)





 Df_join = df_fact.join(df_Department,df_fact.ID ==df_Department.ID,"inner")\
 .join(df_Phone,df_fact.ID ==df_Phone.ID,"inner")\
 .join(df_Leave,df_fact.ID ==df_Leave.ID,"inner")\
.select(df_fact.Amount,df_Department.ID,df_Department.salary,df_Department.department,df_Phone.Phone_type,df_Leave.Leave_type)

display(Df_join)


基本上,我想将这些东西概括为 n 个数据集

 Df_join = df_fact.join(df_Department,df_fact.ID ==df_Department.ID,"inner")\
 .join(df_Phone,df_fact.ID ==df_Phone.ID,"inner")\
 .join(df_Leave,df_fact.ID ==df_Leave.ID,"inner")\
.select(df_fact.Amount,df_Department.ID,df_Department.salary,df_Department.department,df_Phone.Phone_type,df_Leave.Leave_type) ```

【问题讨论】:

    标签: python dataframe apache-spark pyspark


    【解决方案1】:

    由于您在所有数据框中使用 inner join,如果您想防止代码过多,您可以在 functools 中使用 .reduce() 进行连接并选择您想要的列:

    df = reduce(lambda x, y: x.join(y, on='id', how='inner'), [df_fact, df_Department, df_Leave, df_Phone])
    df.show(10, False)
    +---+------+---------+------+------+----------+------------+----------+
    |ID |NAME  |Company  |Amount|salary|department|Leave_type  |Phone_type|
    +---+------+---------+------+------+----------+------------+----------+
    |1  |sravan|company 1|100   |45000 |IT        |Sick Leave  |Apple     |
    |2  |ojaswi|company 1|200   |145000|Manager   |Casual leave|Samsung   |
    +---+------+---------+------+------+----------+------------+----------+
    

    https://docs.python.org/3/library/functools.html#functools.reduce

    【讨论】:

      猜你喜欢
      • 2016-08-30
      • 1970-01-01
      • 2021-09-25
      • 1970-01-01
      • 2015-02-13
      • 2018-03-08
      • 1970-01-01
      • 2023-03-28
      • 1970-01-01
      相关资源
      最近更新 更多