【问题标题】:Spark Dataframe: Select distinct rowsSpark Dataframe:选择不同的行
【发布时间】:2019-07-27 08:53:41
【问题描述】:

我尝试了两种方法从 parquet 中找到不同的行,但它似乎不起作用。
尝试 1: Dataset<Row> df = sqlContext.read().parquet("location.parquet").distinct();
但抛出

Cannot have map type columns in DataFrame which calls set operations
(intersect, except, etc.), 
but the type of column canvasHashes is map<string,string>;;

尝试 2: 尝试运行 sql 查询:

Dataset<Row> df = sqlContext.read().parquet("location.parquet");
    rawLandingDS.createOrReplaceTempView("df");
    Dataset<Row> landingDF = sqlContext.sql("SELECT distinct on timestamp * from df");

我得到的错误:

= SQL ==
SELECT distinct on timestamp * from df
-----------------------------^^^

有没有办法在读取 parquet 文件时获取不同的记录?我可以使用的任何读取选项。

【问题讨论】:

    标签: java sql dataframe apache-spark apache-spark-sql


    【解决方案1】:

    您面临的问题已在异常消息中明确说明 - 因为MapType 列既不可散列也不可排序,不能用作分组或分区表达式的一部分。

    您对 SQL 解决方案的看法在逻辑上不等同于 distinct 上的 Dataset。如果你想基于一组兼容的列去重复数据,你应该使用dropDuplicates

    df.dropDuplicates("timestamp")
    

    相当于

    SELECT timestamp, first(c1) AS c1, first(c2) AS c2,  ..., first(cn) AS cn,
           first(canvasHashes) AS canvasHashes
    FROM df GROUP BY timestamp
    

    不幸的是,如果您的目标是实际的DISTINCT,那就不会那么容易了。可能的解决方案是利用 Scala* Map 散列。您可以像这样定义 Scala udf

    spark.udf.register("scalaHash", (x: Map[String, String]) => x.##)
    

    然后在您的 Java 代码中使用它来派生可用于dropDuplicates 的列:

     df
      .selectExpr("*", "scalaHash(canvasHashes) AS hash_of_canvas_hashes")
      .dropDuplicates(
        // All columns excluding canvasHashes / hash_of_canvas_hashes
        "timestamp",  "c1", "c2", ..., "cn" 
        // Hash used as surrogate of canvasHashes
        "hash_of_canvas_hashes"         
      )
    

    与 SQL 等效

    SELECT 
      timestamp, c1, c2, ..., cn,   -- All columns excluding canvasHashes
      first(canvasHashes) AS canvasHashes
    FROM df GROUP BY
      timestamp, c1, c2, ..., cn    -- All columns excluding canvasHashes
    

    * 请注意java.util.Map 和它的hashCode 将不起作用,因为hashCode 不一致。

    【讨论】:

      【解决方案2】:

      1) 如果您想根据列进行区分,可以使用它

      val df = sc.parallelize(Array((1, 2), (3, 4), (1, 6))).toDF("no", "age")
      
      
      scala> df.show
      +---+---+
      | no|age|
      +---+---+
      |  1|  2|
      |  3|  4|
      |  1|  6|
      +---+---+
      
      val distinctValuesDF = df.select(df("no")).distinct
      
      scala> distinctValuesDF.show
      +---+
      | no|
      +---+
      |  1|
      |  3|
      +---+
      

      2) 如果您希望在所有列上都具有唯一性,请使用 dropduplicate

      scala> val df = sc.parallelize(Array((1, 2), (3, 4),(3, 4), (1, 6))).toDF("no", "age")
      
      
      
      scala> df.show
      
      +---+---+
      | no|age|
      +---+---+
      |  1|  2|
      |  3|  4|
      |  3|  4|
      |  1|  6|
      +---+---+
      
      
      scala> df.dropDuplicates().show()
      +---+---+
      | no|age|
      +---+---+
      |  1|  2|
      |  3|  4|
      |  1|  6|
      +---+---+
      

      【讨论】:

        【解决方案3】:

        是的,语法不正确,应该是:

        Dataset<Row> landingDF = sqlContext.sql("SELECT distinct * from df");
        

        【讨论】:

          猜你喜欢
          • 2017-01-13
          • 1970-01-01
          • 1970-01-01
          • 2018-07-11
          • 1970-01-01
          • 2018-12-22
          • 2019-01-12
          • 1970-01-01
          • 2018-01-19
          相关资源
          最近更新 更多