【问题标题】:Spark Job not returning consistent resultsSpark Job 没有返回一致的结果
【发布时间】:2021-07-23 07:17:52
【问题描述】:

我有一个 spark scala 作业,它正在为给定的时间间隔选择一个分数。我已经针对同一组数据运行了 3 次这项工作,每次我得到的分数结果都略有不同。我的分数是在一个 UDF 中选择的,该 UDF 包含一个 Seq 和一些要评估的分数。目前我只是评估一个分数,所以它应该只返回最高分,但我没有看到返回的一致高分。我不确定为什么会这样,任何帮助将不胜感激,如果需要,我可以添加更多信息。

RUN ONE:
    - Grab data from s3
    - Use pushdown predicates to get filtered data
    - Filter events with certain business rules, reapartiion, and mapPartition to clean up data
    - Filter data that is already good to go
    - Union the two dataframes
    - Join on small table
    - GroupBy & Aggregate to get a sum
    - GroupBy & Aggregate to get a List of sums
    - UDF with business logic to randomly select sum (currently just selecting highest sum)
    - Join on small table
    - Partition data and write to S3

RUN TWO:
    ** Same as Run One but I no longer repartition and I run the UDF after I join
    - Grab data from s3
    - Use pushdown predicates to get filtered data
    - Filter events with certain business rules and mapPartition to clean up data
    - Filter data that is already good to go
    - Union the two dataframes
    - Join on small table
    - GroupBy & Aggregate to get a sum
    - GroupBy & Aggregate to get a List of sums
    - Join on small table
    - UDF with business logic to randomly select sum (currently just selecting highest sum)
    - Partition data and write to S3

RUN THREE:
    ** Same as Run Two but passed in spark conf: spark.sql.shuffle.partitions=400

【问题讨论】:

  • 比较执行计划。如果您发现 udf 的优化方式有所不同,请尝试将 UDF 标记为非确定性:val myUdf = udf{ ... }.asNondeterministic

标签: apache-spark apache-spark-sql user-defined-functions


【解决方案1】:

spark内部将数据划分为partition,使用不同的core或machine进行并行计算,可能导致数据读取顺序不一致。

例如 - 如果文件包含 1-1000 条记录并且文件被分为 6 个分区。 现在,如果您的集群只有 4 个可用内核,则只有 4 个分区可以完成 2 个将等待,但在前 4 个中首先完成的是基于数据和计算 因此,spark 不保证第一个分区将首先完成,如果您假设它将按顺序处理数据,这可能会导致不一致。

上面的例子只是为了说明。

这可能是您没有获得一致结果的原因。您可以尝试一些可能的解决方案 -

  1. 您可以在数据中添加显式排序,然后执行计算。
  2. 如果文件不大,可以尝试减少 分区为 1 但这会导致性能问题 不推荐。
  3. 尝试对固定数量的数据重新分区 基于导致相同分区的键的分区 每次运行作业时。
  4. 您在 Hadoop 集群上运行它,然后您可以关闭推测。

谢谢

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-03-10
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多