【问题标题】:Glue PySpark job failing with resource issuesGlue PySpark 作业因资源问题而失败
【发布时间】:2020-06-11 18:40:26
【问题描述】:

我正在尝试在我的胶水 pyspark 作业中从 s3 读取一个大的 json 文件(大约 87GB)。我必须从此文件中删除重复项,将此文件分成多个较小的文件,然后将其保存回 s3。当我试图通过运行以下工作来做到这一点时。我遇到了资源问题。有什么方法可以优化它吗?

提前感谢所有帮助。

from pyspark.sql import SparkSession

if __name__ == '__main__':

    app_name = "test"
    spark = SparkSession.builder.appName(app_name).getOrCreate()

    DATA_FILE_PATH = 's3://test//ids_20200606_173121.gz'
    output_FILE_PATH = 's3://output/data/y=2020/m=06/d=10'


    device_graph_df = spark.read.json(DATA_FILE_PATH)
    distinct_device_graph_df = device_graph_df.dropDuplicates(['ip'])
    device_graph_df = distinct_device_graph_df.repartition(40)

    distinct_device_graph_df.write.parquet(output_FILE_PATH )

错误

命令失败,退出代码为 1 - Yarn 资源管理器杀死了 Spark 应用程序,请参阅 Spark 驱动程序日志/指标进行诊断。 [任务 5 的执行器任务启动工作人员] client.YarnClient (YarnClient.java:makeRestApiRequest(66)) - URL http://0.0.0.0:8088/ws/v1/cluster/apps/application_1591879099247_0001 的 GET 请求失败 com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.conn.HttpHostConnectException:连接到 0.0.0.0:8088 [/0.0.0.0] 失败:连接被拒绝(连接被拒绝) 在 com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:158)

【问题讨论】:

  • 尝试通过将 spark.sql.shuffle.partitions 设置为 400 来增加 shuffle 分区的数量,然后重新分区文件,使每个文件的大小为 512 mb 块。因此,如果您的文件大小为 87 GB,则重新分区大约 150
  • 正如下面@RojoSam 所指出的,问题是由于 GZip 压缩导致的文件,该文件不可拆分,只有一个执行程序正在处理它。可能这就是它陷入资源问题的原因。因此,让我尝试更改输入文件的压缩技术。然后可能不需要重新分区,因为 spark 会在读取时将其加载到多个分区中。

标签: apache-spark pyspark aws-glue


【解决方案1】:

你有几个问题:

  1. 您的非常大的 JSON 文件使用 GZip 压缩,这使得文件不可拆分,并且所有文件只需要由一个执行程序处理(无论您的作业是否配置了更多工作人员)。要解决此问题,您可以解压缩文件。如果您需要压缩文件以便能够处理它,那么您可以尝试 BZIP2 或 LZO,它们在 hadoop 中是标准的,但我没有在 Glue 中使用它们。为了能够读取数据,这几乎是强制性的(仅在一个节点中处理 87 GB 的压缩数据将需要 Glue 无法提供的大量内存)。

  2. Spark 需要读取 JSON 文件两次,一次用于推断架构,第二次用于处理数据,其中 87 GB 的数据不好分布,这可能是一个挑战。为了尽量减少这种情况,您有两种选择:

a) 如果您知道 JSON 记录的 Schema,那么您可以提供 Schema:

device_graph_df = spark.read.schema(<your schema).json(DATA_FILE_PATH)

b) 仅读取一小部分数据以推断架构(例如 1/10):

device_graph_df = spark.read.option("samplingRatio", "0.1").json(DATA_FILE_PATH)

【讨论】:

  • 这很有意义,阅读 1/10 来阅读架构......从来没有想过......很好的答案
【解决方案2】:

您可以尝试 2 个选项

  1. 使用合并而不是重新分区。
  2. 先在 ip 上重新分区,然后进行 dedup。
df.repartition(40,'ip').dropDuplicates(['ip'])

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2019-09-24
    • 1970-01-01
    • 2020-11-07
    • 2020-07-18
    • 1970-01-01
    • 2021-12-23
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多