【问题标题】:Total size of serialized results of 16 tasks (1048.5 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)16 个任务的序列化结果总大小 (1048.5 MB) 大于 spark.driver.maxResultSize (1024.0 MB)
【发布时间】:2018-06-08 08:20:27
【问题描述】:

--conf spark.driver.maxResultSize=2050 添加到spark-submit 命令时出现以下错误。

17/12/27 18:33:19 ERROR TransportResponseHandler: Still have 1 requests outstanding when connection from /XXX.XX.XXX.XX:36245 is closed
17/12/27 18:33:19 WARN Executor: Issue communicating with driver in heartbeater
org.apache.spark.SparkException: Exception thrown in awaitResult:
        at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)
        at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
        at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:92)
        at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:726)
        at org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply$mcV$sp(Executor.scala:755)
        at org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply(Executor.scala:755)
        at org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply(Executor.scala:755)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1954)
        at org.apache.spark.executor.Executor$$anon$2.run(Executor.scala:755)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Connection from /XXX.XX.XXX.XX:36245 closed
        at org.apache.spark.network.client.TransportResponseHandler.channelInactive(TransportResponseHandler.java:146)

添加此配置的原因是错误:

py4j.protocol.Py4JJavaError: An error occurred while calling o171.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 16 tasks (1048.5 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)

因此,我将 maxResultSize 增加到 2.5 Gb,但 Spark 作业仍然失败(如上所示的错误)。 如何解决这个问题?

【问题讨论】:

  • --conf spark.driver.maxResultSize=2.5g 你能试试这样传递内存大小吗?
  • 还要更彻底地检查堆栈跟踪是否有任何可能导致工作人员被杀死的内存不足发生?
  • @SumeetSharma:我也对其进行了测试。有同样的错误。

标签: python apache-spark pyspark spark-dataframe


【解决方案1】:

问题似乎是您试图拉回驱动程序的数据量太大。您很可能正在使用 collect 方法从 DataFrame/RDD 中检索所有值。 驱动程序是一个单一的进程,通过收集一个 DataFrame,您可以将您分布在集群中的所有数据拉回一个节点。 这违背了分发它的目的!只有在将数据减少到可管理的数量后才有意义。

你有两个选择:

  1. 如果您确实需要处理所有这些数据,那么您应该将其保留在执行程序之外。使用 HDFSParquet 以分布式方式保存数据,并使用 Spark 方法处理集群上的数据,而不是试图将其全部收集回一个地方。

  2. 如果您确实需要将数据返回给驱动程序,您应该检查您是否真的需要所有数据。如果您只需要汇总统计信息,请在调用 collect 之前在执行程序上计算出来。或者,如果您只需要前 100 个结果,则只需收集前 100 个。

更新:

您可能会遇到此错误的另一个原因不太明显。 Spark 将尝试将数据发送回驱动程序,而不仅仅是在您显式调用 collect 时。如果您使用累加器,它还将为每个任务发回累加器结果、广播连接的数据以及有关每个任务的一些小状态数据。如果你有很多分区(根据我的经验是 20k+),你有时会看到这个错误。这是一个known issue,做了一些改进,还有更多in the works

如果这是您的问题,则可以通过以下选项:

  1. 增加 spark.driver.maxResultSize 或将其设置为 0 表示无限制
  2. 如果广播连接是罪魁祸首,您可以减少spark.sql.autoBroadcastJoinThreshold 以限制广播连接数据的大小
  3. 减少分区数

【讨论】:

  • 您能否详细说明第 (1) 点?假设我想describe() 一个从镶木地板文件中读取的巨大数据框。如何通过将镶木地板文件排除在执行程序之外来实现这一点?
  • 使用 DF 转换来创建您需要的统计信息,然后调用 collect/show 将结果返回给驱动程序。这样你只下载统计数据,而不是完整的数据。如果您想查看示例行,请使用 show 获取前几行。
  • 是的,我的回答更笼统,但describe().show() 可以正常工作。
  • 设置 spark.driver.maxResultSize = 0 解决了我在 pyspark 中的问题。我在单台机器上独立使用 pyspark,我认为设置无限大小是可以的。
  • 这对我也有用……你是个传奇
【解决方案2】:

原因:由诸如 RDD 的 collect() 之类的操作导致向驱动程序发送大量数据

解决方案: 由 SparkConf 设置:conf.set("spark.driver.maxResultSize", "4g") 要么 由 spark-defaults.conf 设置:spark.driver.maxResultSize 4g 要么 调用 spark-submit 时设置:--conf spark.driver.maxResultSize=4g

【讨论】:

  • 有没有办法使用 Luigi PySparkTask 属性来做到这一点?
  • 设置 spark.driver.maxResultSize = 0 解决了我在 pyspark 中的问题。我在单台机器上独立使用 pyspark,我认为设置无限大小是可以的。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2016-03-06
  • 1970-01-01
  • 1970-01-01
  • 2020-09-23
  • 1970-01-01
相关资源
最近更新 更多