【问题标题】:orderby is not giving correct results in spark SQLorderby 在 spark SQL 中没有给出正确的结果
【发布时间】:2020-10-21 05:08:47
【问题描述】:

我有一个大约 60 列和 3000 行的数据集。 我正在使用 orderby 对数据集中的行进行排序并写入文件 但它没有给出正确的结果。

dataset.orderBy(new Column(col_name).desc())
                .coalesce(4)
                .write()
                .format("com.databricks.spark.csv")
                .option("delimiter", ",")
                .option("header", "false")
                .mode(SaveMode.Overwrite)
                .save("hdfs://" + filePath);

请让我知道我在这里缺少什么

我也找到了以下解决方案,但认为这不是正确的解决方案

        Row[] rows = dataset.take(3000);

        for ( Row row : rows){
            // here i am writing in a file row by row
            System.out.println(row);
        }

【问题讨论】:

  • 你的意图是什么? coalesce(4) 将提供 4 个文件。您只想在每个文件中进行排序吗?或者您想要一个具有正确顺序的文件?
  • 如果不使用 coalesce 它会创建 200 个文件,这就是为什么我使用 coalesce(4) 最后只创建 4 个文件。在此之后,我将 4 个文件写入一个最终文件。像下面``` FSDataInputStream inputStream; FileStatus[] partFiles = hdfs.globStatus(new Path(reportDatePath + "/part*")); for (FileStatus fileStatus : partFiles) { inputStream = hdfs.open(fileStatus.getPath()); IOUtils.copyBytes(inputStream, reportFileOs, hadoopConf, false); IOUtils.closeStream(inputStream); }```
  • 但是你的最终单个文件没有订单保证。
  • 是的..这就是发生的事情:(所以我应该在这里使用 coalesce(1) 吗?当数据比我在问题中提到的大小大 3 4 倍时,这不会引起问题

标签: apache-spark apache-spark-sql


【解决方案1】:

问题是coalesce 会以未排序的方式合并您现有的分区(不,coalesce 不会导致随机播放)。

如果你想要 4 个文件并在文件内排序,你需要在 orderBy 之前更改 spark.sql.suffle.partitions,这将导致你的 shuffle 有 4 个分区。

spark.sql("set spark.sql.shuffle.partitions=4")

dataset.orderBy(new Column(col_name).desc())
            .write()
            .format("com.databricks.spark.csv")
            .option("delimiter", ",")
            .option("header", "false")
            .mode(SaveMode.Overwrite)
            .save("hdfs://" + filePath);

如果你只关心文件内的排序,你也可以使用sortWithinPartitions(new Column(col_name).desc())

【讨论】:

    【解决方案2】:

    因为您的 .coalesce(4) 打乱了您的数据框顺序

    先合并再排序。

    dataset
    .coalesce(4)
    .orderBy(new Column(col_name).desc())                
    .write()
    .format("com.databricks.spark.csv")
    .option("delimiter", ",")
    .option("header", "false")
    .mode(SaveMode.Overwrite)
    .save("hdfs://" + filePath);
    

    您还应该在 spark 上下文中将 spark.sql.suffle.partitions 设置为 4,因为 order by 也是 provoque suffle。

    【讨论】:

    • 这没有任何意义。 orderBy 将再次洗牌到 200 个(默认)分区
    • 编辑后coalesce(4) 不会添加任何值。这是多余的
    【解决方案3】:

    根据您在 cmets 中的说明,您需要将您的 ordered 输出包含在单个文件中。

    只有 spark,这只有在 spark.sql("set spark.sql.shuffle.partitions=1") 后跟 orderBy 和写入时才有可能。但缺点是它won't scale 用于大数据,因为它不会被并行化。

    解决方法是:

    • 让您的 spark 执行 orderBy 的最大并行工作(即不要 coalesce"set spark.sql.shuffle.partitions=1")并拥有 n 的文件数。
    • 在文件合并代码中添加一些额外的逻辑处理
    • 列出所有文件,获取col_name的值并维护[(col_name value), filepath]的映射
    • 按键排序(col_name的值)
    • 然后执行合并

    这将维持您的订单。

    想法是,合并部分将主要是单线程的,至少以分布式方式进行排序:)

    【讨论】:

      猜你喜欢
      • 2021-12-21
      • 2019-07-07
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2014-05-03
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多