【问题标题】:Split dataset based on column values in spark根据 spark 中的列值拆分数据集
【发布时间】:2017-07-27 12:22:51
【问题描述】:

我正在尝试根据制造商列内容将数据集拆分为不同的数据集。很慢
请提出改进​​代码的方法,使其执行得更快,减少Java代码的使用。

    List<Row> lsts= countsByAge.collectAsList();

        for(Row lst:lsts){
             String man=lst.toString();
             man = man.replaceAll("[\\p{Ps}\\p{Pe}]", "");
             Dataset<Row> DF = src.filter("Manufacturer='"+man+"'");
             DF.show();

        }

代码、输入和输出数据集如下所示。

    package org.sparkexample;
    import org.apache.parquet.filter2.predicate.Operators.Column;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.sql.Dataset;
    import org.apache.spark.sql.RelationalGroupedDataset;
    import org.apache.spark.sql.Row;
    import org.apache.spark.sql.SQLContext;
    import org.apache.spark.sql.SparkSession;

    import java.util.Arrays;
    import java.util.List;

    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
            public class GroupBy {

                public static void main(String[] args) {
                    System.setProperty("hadoop.home.dir", "C:\\winutils");
                    JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("SparkJdbcDs").setMaster("local[*]"));
                    SQLContext sqlContext = new SQLContext(sc);
                    SparkSession spark = SparkSession.builder().appName("split datasets").getOrCreate();
                    sc.setLogLevel("ERROR");

                    Dataset<Row> src= sqlContext.read()
                                .format("com.databricks.spark.csv")
                                .option("header", "true")
                                .load("sample.csv");


                    Dataset<Row> unq_manf=src.select("Manufacturer").distinct();
                    List<Row> lsts= unq_manf.collectAsList();

                    for(Row lst:lsts){
                         String man=lst.toString();
                         man = man.replaceAll("[\\p{Ps}\\p{Pe}]", "");
                         Dataset<Row> DF = src.filter("Manufacturer='"+man+"'");
                         DF.show();

                }
            }
        }

        INPUT TABLE-
        +------+------------+--------------------+---+
        |ItemID|Manufacturer|       Category name|UPC|
        +------+------------+--------------------+---+
        |   804|         ael|Brush & Broom Han...|123|
        |   805|         ael|Wheel Brush Parts...|124|
        |   813|         ael|      Drivers Gloves|125|
        |   632|        west|       Pipe Wrenches|126|
        |   804|         bil|     Masonry Brushes|127|
        |   497|        west|   Power Tools Other|128|
        |   496|        west|   Power Tools Other|129|
        |   495|         bil|           Hole Saws|130|
        |   499|         bil|    Battery Chargers|131|
        |   497|        west|   Power Tools Other|132|
        +------+------------+--------------------+---+

        OUTPUT-
        +------------+
        |Manufacturer|
        +------------+
        |         ael|
        |        west|
        |         bil|
        +------------+

        +------+------------+--------------------+---+
        |ItemID|Manufacturer|       Category name|UPC|
        +------+------------+--------------------+---+
        |   804|         ael|Brush & Broom Han...|123|
        |   805|         ael|Wheel Brush Parts...|124|
        |   813|         ael|      Drivers Gloves|125|
        +------+------------+--------------------+---+

        +------+------------+-----------------+---+
        |ItemID|Manufacturer|    Category name|UPC|
        +------+------------+-----------------+---+
        |   632|        west|    Pipe Wrenches|126|
        |   497|        west|Power Tools Other|128|
        |   496|        west|Power Tools Other|129|
        |   497|        west|Power Tools Other|132|
        +------+------------+-----------------+---+

        +------+------------+----------------+---+
        |ItemID|Manufacturer|   Category name|UPC|
        +------+------------+----------------+---+
        |   804|         bil| Masonry Brushes|127|
        |   495|         bil|       Hole Saws|130|
        |   499|         bil|Battery Chargers|131|
        +------+------------+----------------+---+

谢谢

【问题讨论】:

    标签: apache-spark apache-spark-sql apache-spark-2.0 apache-spark-dataset


    【解决方案1】:

    在这种情况下,您有两个选择:

    1. 首先你必须收集独特的制造商值,然后映射 在结果数组上:

      val df = Seq(("HP", 1), ("Brother", 2), ("Canon", 3), ("HP", 5)).toDF("k", "v")    
      val brands = df.select("k").distinct.collect.flatMap(_.toSeq)
      val BrandArray = brands.map(brand => df.where($"k" <=> brand))
      BrandArray.foreach { x =>
      x.show()
      println("---------------------------------------")
      }
      
    2. 您也可以根据制造商保存数据框。

      df.write.partitionBy("hour").saveAsTable("parquet")

    【讨论】:

    • @Souvik 在地图功能中如何从第三行内部到达“df”?我认为这是不可能的,在 java 中。
    【解决方案2】:

    如果您需要经常根据制造商进行查询,最好使用制造商作为分区键来编写数据帧,而不是按制造商拆分数据集/数据帧

    如果您仍然想要基于列值之一的单独数据帧,使用 pyspark 和 spark 2.0+ 的方法之一可能是-

    from pyspark.sql import functions as F
    
    df = spark.read.csv("sample.csv",header=True)
    
    # collect list of manufacturers
    manufacturers = df.select('manufacturer').distinct().collect()
    
    # loop through manufacturers to filter df by manufacturers and write it separately 
    for m in manufacturers:
        df1 = df.where(F.col('manufacturers')==m[0])
        df1[.repartition(repartition_col)].write.parquet(<write_path>,[write_mode])
    
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-05-06
      • 1970-01-01
      • 2023-01-05
      • 2021-01-14
      相关资源
      最近更新 更多