【问题标题】:sparklyr failing with java.lang.OutOfMemoryError: GC overhead limit exceededsparklyr 因 java.lang.OutOfMemoryError 失败:超出 GC 开销限制
【发布时间】:2018-08-18 12:58:01
【问题描述】:

我在使用 spark_apply 的 Spark 中遇到了超出 GC 开销限制的错误。这是我的规格:

sparklyr v0.6.2 星火 v2.1.0 4个worker,8核29G内存

闭包get_dates 一次从 Cassandra 中提取一行数据。总共大约有 200k 行。该过程运行了大约一个半小时,然后给了我这个内存错误。

我已经尝试过spark.driver.memory,它应该会增加堆大小,但它不起作用。

有什么想法吗?下面的用法

> config <- spark_config()
> config$spark.executor.cores = 1 # this ensures a max of 32 separate executors
> config$spark.cores.max = 26 # this ensures that cassandra gets some resources too, not all to spark
> config$spark.driver.memory = "4G"
> config$spark.driver.memoryOverhead = "10g" 
> config$spark.executor.memory = "4G"
> config$spark.executor.memoryOverhead = "1g"
> sc <- spark_connect(master = "spark://master",
+                     config = config)
> accounts <- sdf_copy_to(sc, insight %>%
+                           # slice(1:100) %>% 
+                           {.}, "accounts", overwrite=TRUE)
> accounts <- accounts %>% sdf_repartition(78)
> dag <- spark_apply(accounts, get_dates, group_by = c("row"), 
+                    columns = list(row = "integer",
+                                   last_update_by = "character",
+                                   last_end_time = "character",
+                                   read_val = "numeric",
+                                   batch_id = "numeric",
+                                   fail_reason = "character",
+                                   end_time = "character",
+                                   meas_type = "character",
+                                   svcpt_id = "numeric",
+                                   org_id = "character",
+                                   last_update_date = "character",
+                                   validation_status = "character"
+                                   ))
> peak_usage <- dag %>% collect  
Error: java.lang.OutOfMemoryError: GC overhead limit exceeded
    at org.apache.spark.sql.execution.SparkPlan$$anon$1.next(SparkPlan.scala:260)
    at org.apache.spark.sql.execution.SparkPlan$$anon$1.next(SparkPlan.scala:254)
    at scala.collection.Iterator$class.foreach(Iterator.scala:743)
    at org.apache.spark.sql.execution.SparkPlan$$anon$1.foreach(SparkPlan.scala:254)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeCollect$1.apply(SparkPlan.scala:276)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeCollect$1.apply(SparkPlan.scala:275)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
    at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:275)
    at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2371)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
    at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2765)
    at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2370)
    at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$collect$1.apply(Dataset.scala:2375)
    at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$collect$1.apply(Dataset.scala:2375)
    at org.apache.spark.sql.Dataset.withCallback(Dataset.scala:2778)
    at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2375)
    at org.apache.spark.sql.Dataset.collect(Dataset.scala:2351)
    at sparklyr.Utils$.collect(utils.scala:196)
    at sparklyr.Utils.collect(utils.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at sparklyr.Invoke$.invoke(invoke.scala:102)
    at sparklyr.StreamHandler$.handleMethodCall(stream.scala:97)
    at sparklyr.StreamHandler$.read(stream.scala:62)
    at sparklyr.BackendHandler.channelRead0(handler.scala:52)
    at sparklyr.BackendHandler.channelRead0(handler.scala:14)
    at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353)

【问题讨论】:

    标签: java apache-spark sparklyr


    【解决方案1】:

    这是一个 GC 问题,也许您应该尝试使用其他参数配置您的 JVM,您是否使用 G1 作为您的 GC? 如果您无法提供更多内存并且 gc 收集时间有问题,您应该尝试使用另一个 JVM(可能是 Azul 系统中的 Zing?

    【讨论】:

      【解决方案2】:

      也许我误读了您的示例,但是当您收集而不是使用 spark_apply 时似乎会出现内存问题。试试

      config$spark.driver.maxResultSize <- XXX 
      

      XXX 是您期望需要的(我已将其设置为 4G 以进行类似的工作)。详情请见https://spark.apache.org/docs/latest/configuration.html

      【讨论】:

        【解决方案3】:

        我已经使用spark.yarn.executor.memoryOverhead 设置了spark_apply 所需的开销内存。我发现使用sfd_repartitionby= 参数很有用,并且在spark_apply 中使用group_by= 也有帮助。您在执行者之间拆分数据的能力越多越好。

        【讨论】:

          猜你喜欢
          • 2015-10-30
          • 1970-01-01
          • 2015-02-12
          • 2017-12-27
          • 2020-09-08
          • 2020-07-24
          • 1970-01-01
          • 2019-06-29
          相关资源
          最近更新 更多