【问题标题】:pyspark 加入多个数据框
【发布时间】:2022-01-23 12:38:06
【问题描述】:

我是 spark 和大数据领域的新手。 我使用一些气流 DAG 将我的 MySQL 数据库转移到 HDFS,现在每个表都是 HDFS 中的拼花文件,现在我需要通过数据帧将 blew 查询转换为 pyspark。

SELECT PV.id product_id,
       ZP.vendor_id vendor_id,
       V.title vendor_name,
       PV.barcode barcode,
       PV.title product_title,
       ZP.active product_active,
       ZP.price product_price,
       ZP.capacity product_capacity,
       ZP.stock product_stock,
       MC1.title subcat_title,
       MC2.title parent_category_title,
       ZB.title brand_name
FROM xpediaProductVariationVendorInfo ZP
JOIN ProductVariations PV ON PV.id = ZP.xpediaProductVariation_id
JOIN Vendors V ON ZP.vendor_id = V.id
JOIN VendorTypes vt ON V.vendor_type_id = vt.id
JOIN xpediaProductVariation ZPV ON ZPV.id = PV.id
JOIN MenuCategories MC1 ON PV.menu_category_id = MC1.id
LEFT JOIN MenuCategories MC2 ON MC1.parent_id = MC2.id
LEFT JOIN xpedia_brand ZB ON ZB.id = ZPV.brand_id
WHERE ZP.vendor_id={}
  AND V.status not in ('Suspend')
GROUP BY PV.id,
         ZP.vendor_id;      

【问题讨论】:

    标签: sql pyspark apache-spark-sql bigdata


    【解决方案1】:

    那里有很多逻辑,您的分组不符合您选择的列。 (选择了 12 列,只有 2 列分组) 在 pyspark 中,您将不得不重写如下内容 -

    
    import pyspark.sql.functions as F
    
    df_output = (df_xpediaProductVariationVendorInfo.alias("ZP")
                 .join (df_ProductVariations.alias("PV"), F.col("PV.id") == F.col("ZP.xpediaProductVariation_id"))
                 .join (df_Vendors.alias("V"), F.col("ZP.vendor_id") == F.col("V.id"))
                 .join (df_VendorTypes.alias("vt"), F.col("V.vendor_type_id") == F.col("vt.id"))
                 .join (df_xpediaProductVariation.alias("ZPV"), F.col("ZPV.id") == F.col("PV.id"))
                 .join (df_MenuCategories.alias("MC1"), F.col("PV.menu_category_id") = F.col("MC1.id"))
                 .join (df_MenuCategories.alias("MC2"), F.col("MC1.parent_id") = F.col("MC2.id"), "left")
                 .join (df_xpedia_brand.alias("ZB"), F.col("ZB.id") = F.col("ZPV.brand_id"), "left")
                 .where ((F.col("ZP.vendor_id") == {}) 
                         & ~(F.col("V.status").isin('Suspend')))
                 .select(F.col("PV.id"),
                         F.col("ZP.vendor_id"),
                         F.col("V.title"),
                         F.col("PV.barcode"),
                         F.col("PV.title"),
                         F.col("ZP.active"),
                         F.col("ZP.price"),
                         F.col("ZP.capacity"),
                         F.col("ZP.stock"),
                         F.col("MC1.title"),
                         F.col("MC2.title"),
                         F.col("ZB.title"))
                )
    
    df_output.show()
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2022-01-26
      • 1970-01-01
      • 1970-01-01
      • 2023-03-12
      • 2020-09-26
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多