【问题标题】:Join, Aggregate Then Select Specific Columns In Apache Spark在 Apache Spark 中加入、聚合然后选择特定列
【发布时间】:2019-05-20 00:03:17
【问题描述】:

像这样正确地从 csv 文件加载产品和相应的销售额

    Dataset<Row> dfProducts = sparkSession.read()
            .option("mode", "DROPMALFORMED")
            .option("header", "true")
            .option("inferSchema", "true")
            .option("charset", "UTF-8")
            .csv(new ClassPathResource("products.csv").getURL().getPath());
    Dataset<Row> dfSaledetails = sparkSession.read()
            .option("mode", "DROPMALFORMED")
            .option("header", "true")
            .option("inferSchema", "true")
            .option("charset", "UTF-8")
            .csv(new ClassPathResource("saledetails.csv").getURL().getPath());

产品有列(product_id、product_name、...)。销售有列(product_id、amount、...)

我需要实现的是基于公共列(product_id) 加入两个数据集,按product_id 分组,求和列amount 然后仅选择/显示特定列(产品名称和求和结果)

以下是我的尝试

    Dataset<Row> dfSalesTotals = dfSaledetails
            .join(dfProducts, dfSaledetails.col("product_id").equalTo(dfProducts.col("product_id")))
            .groupBy(dfSaledetails.col("product_id"))
            .agg(sum(dfSaledetails.col("amount")).alias("total_amount"))
            .select(dfProducts.col("product_name"), col("total_amount"));
    dfSalesTotals.show();

这会引发以下错误

Caused by: org.apache.spark.sql.AnalysisException: Resolved attribute(s) product_name#215 missing from product_id#272,total_amount#499 in operator 
!Project [product_name#215, total_amount#499].;;
!Project [product_name#215, total_amount#499]
+- Aggregate [product_id#272], [product_id#272, sum(amount#277) AS total_amount#499]
   +- Join Inner, (product_id#272 = product_id#212)
      :- Relation[sale_detail_auto_id#266,sale_auto_id#267,sale_id#268,agent_id#269,sale_detail_id#270,inventory_id#271,product_id#272,unit_cost#273,unit_price#274,vat#275,quantity#276,amount#277,promotion_id#278,discount#279] csv
  +- Relation[product_id#212,user_group_id_super_owner#213,product_category#214,product_name#215,product_type#216,product_code#217,distributor_code#218,product_units#219,product_unitCost#220,product_manufacturer#221,product_distributor#222,create_date#223,update_date#224,vat#225,product_weight#226,carton_size#227,product_listStatus#228,active_status#229,distributor_type#230,bundle_type#231,barcode_type#232,product_family_id#233] csv

【问题讨论】:

    标签: java apache-spark apache-spark-sql


    【解决方案1】:

    如果你想保留product_name,它应该是groupBy

    .groupBy(
      dfSaledetails.col("product_id"),
      col("product_name")))
    

    或在agg

    .agg(
      sum(dfSaledetails.col("amount")).alias("total_amount"), 
      first(col("product_name")).alias("product_name"))
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2019-05-01
      • 1970-01-01
      • 2019-01-12
      • 1970-01-01
      • 2019-12-21
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多