【问题标题】:Out of memory error when collecting data out of Spark cluster从 Spark 集群中收集数据时出现内存不足错误
【发布时间】:2025-12-29 19:40:10
【问题描述】:

我知道关于 SO 有很多关于 Spark 内存不足错误的问题,但我还没有找到解决方案。

我有一个简单的工作流程:

  1. 从 Amazon S3 读入 ORC 文件
  2. filter 到一小部分行
  3. select 一小部分列
  4. collect 进入驱动节点(这样我就可以在R 中做额外的操作)

当我运行上述程序然后 cache 运行表来触发内存时,它占用 collect 数据时出现 OOM 错误我的驱动节点。

我尝试在以下设置上运行:

  • 具有 32 核和 244GB 内存的计算机上的本地模式
  • 具有 10 个 6.2 GB 执行程序和 61 GB 驱动程序节点的独立模式

对于其中的每一个,我都使用了executor.memorydriver.memorydriver.maxResultSize 的多种配置,以涵盖可用内存中的所有可能值,但我总是以内存不足错误告终在collect 阶段;任何一个 java.lang.OutOfMemoryError: Java heap space,
java.lang.OutOfMemoryError : GC overhead limit exceeded, 或 Error in invoke_method.spark_shell_connection(spark_connection(jobj), : No status is returned.(表示内存问题的sparklyr 错误)。

根据我对 Spark 的 [有限] 理解,在收集之前缓存表应该强制所有计算 - 即,如果表在缓存

请注意,this question 的回答有一些我尚未尝试的建议,但这些建议可能会影响性能(例如序列化 RDD),因此希望尽可能避免使用。

我的问题:

  1. 缓存后占用如此少空间的数据帧怎么会导致内存问题?
  2. 在我继续使用可能会影响性能的其他选项之前,我是否有明显的检查/更改/故障排除来帮助解决问题?

谢谢

编辑:注意以下@Shaido 的评论,通过Sparklyr 调用cache“通过在表格上执行count(*) 强制将数据加载到内存中”[来自Sparklyr 文档] - 即在调用collect 之前,表应该位于内存中并且所有计算都运行(我相信)。

编辑:遵循以下建议后的一些额外观察:

  • 根据下面的 cmets,我现在尝试将数据写入 csv 而不是收集以了解可能的文件大小。此操作会创建一组 csvs,大小约为 3GB,缓存后运行仅需 2 秒。
  • 如果我将 driver.maxResultSize 设置为
  • 如果我在调用collect 后在任务管理器中查看内存使用情况,我会看到使用情况一直在上升,直到达到 ~ 90GB,此时出现 OOM 错误。 因此,无论出于何种原因,用于执行 collect 操作的 RAM 量都比我尝试收集的 RDD 的大小大 100 倍左右。李>

编辑: 代码添加在下面,根据 cmets 中的要求。

#__________________________________________________________________________________________________________________________________

# Set parameters used for filtering rows
#__________________________________________________________________________________________________________________________________

firstDate <- '2017-07-01'
maxDate <- '2017-08-31'
advertiserID <- '4529611'
advertiserID2 <- '4601141'
advertiserID3 <- '4601141'

library(dplyr)
library(stringr)
library(sparklyr)

#__________________________________________________________________________________________________________________________________

# Configure & connect to spark
#__________________________________________________________________________________________________________________________________

Sys.setenv("SPARK_MEM"="100g")
Sys.setenv(HADOOP_HOME="C:/Users/Jay.Ruffell/AppData/Local/rstudio/spark/Cache/spark-2.0.1-bin-hadoop2.7/tmp/hadoop") 

config <- spark_config()
config$sparklyr.defaultPackages <- "org.apache.hadoop:hadoop-aws:2.7.3" # used to connect to S3
Sys.setenv(AWS_ACCESS_KEY_ID="")
Sys.setenv(AWS_SECRET_ACCESS_KEY="") # setting these blank ensures that AWS uses the IAM roles associated with the cluster to define S3 permissions

# Specify memory parameters - have tried lots of different values here!
config$`sparklyr.shell.driver-memory` <- '50g' 
config$`sparklyr.shell.executor-memory` <- '50g'
config$spark.driver.maxResultSize <- '50g'
sc <- spark_connect(master='local', config=config, version='2.0.1')

#__________________________________________________________________________________________________________________________________

# load data into spark from S3 ----
#__________________________________________________________________________________________________________________________________

#+++++++++++++++++++
# create spark table (not in memory yet) of all logfiles within logfiles path
#+++++++++++++++++++

spark_session(sc) %>%
  invoke("read") %>% 
  invoke("format", "orc") %>%
  invoke("load", 's3a://nz-omg-ann-aipl-data-lake/aip-connect-256537/orc-files/dcm-log-files/dt2-facts') %>% 
  invoke("createOrReplaceTempView", "alldatadf") 
alldftbl <- tbl(sc, 'alldatadf') # create a reference to the sparkdf without loading into memory

#+++++++++++++++++++
# define variables used to filter table down to daterange
#+++++++++++++++++++

# Calculate firstDate & maxDate as unix timestamps
unixTime_firstDate <- as.numeric(as.POSIXct(firstDate))+1
unixTime_maxDate <- as.numeric(as.POSIXct(maxDate)) + 3600*24-1

# Convert daterange params into date_year, date_month & date_day values to pass to filter statement
dateRange <- as.character(seq(as.Date(firstDate), as.Date(maxDate), by=1))
years <- unique(substring(dateRange, first=1, last=4))
if(length(years)==1) years <- c(years, years)
year_y1 <- years[1]; year_y2 <- years[2]
months_y1 <- substring(dateRange[grepl(years[1], dateRange)], first=6, last=7)
minMonth_y1 <- min(months_y1)
maxMonth_y1 <- max(months_y1)
months_y2 <- substring(dateRange[grepl(years[2], dateRange)], first=6, last=7)
minMonth_y2 <- min(months_y2)
maxMonth_y2 <- max(months_y2) 

# Repeat for 1 day prior to first date & one day after maxdate (because of the way logfile orc partitions are created, sometimes touchpoints can end up in the wrong folder by 1 day. So read in extra days, then filter by event time)
firstDateMinusOne <- as.Date(firstDate)-1
firstDateMinusOne_year <- substring(firstDateMinusOne, first=1, last=4)
firstDateMinusOne_month <- substring(firstDateMinusOne, first=6, last=7) 
firstDateMinusOne_day <- substring(firstDateMinusOne, first=9, last=10)
maxDatePlusOne <- as.Date(maxDate)+1
maxDatePlusOne_year <- substring(maxDatePlusOne, first=1, last=4)
maxDatePlusOne_month <- substring(maxDatePlusOne, first=6, last=7)
maxDatePlusOne_day <- substring(maxDatePlusOne, first=9, last=10)

#+++++++++++++++++++
# Read in data, filter & select
#+++++++++++++++++++

# startTime <- proc.time()[3]
dftbl <- alldftbl %>% # create a reference to the sparkdf without loading into memory

  # filter by month and year, using ORC partitions for extra speed
  filter(((date_year==year_y1  & date_month>=minMonth_y1 & date_month<=maxMonth_y1) |
            (date_year==year_y2 & date_month>=minMonth_y2 & date_month<=maxMonth_y2) |
            (date_year==firstDateMinusOne_year & date_month==firstDateMinusOne_month & date_day==firstDateMinusOne_day) |
            (date_year==maxDatePlusOne_year & date_month==maxDatePlusOne_month & date_day==maxDatePlusOne_day))) %>%

  # filter to be within firstdate & maxdate. Note that event_time_char will be in UTC, so 12hrs behind.
  filter(event_time>=(unixTime_firstDate*1000000) & event_time<(unixTime_maxDate*1000000)) %>%

  # filter by advertiser ID
  filter(((advertiser_id==advertiserID | advertiser_id==advertiserID2 | advertiser_id==advertiserID3) & 
            !is.na(advertiser_id)) |
           ((floodlight_configuration==advertiserID | floodlight_configuration==advertiserID2 | 
               floodlight_configuration==advertiserID3) & !is.na(floodlight_configuration)) & user_id!="0") %>%

  # Define cols to keep
  transmute(time=as.numeric(event_time/1000000),
            user_id=as.character(user_id),
            action_type=as.character(if(fact_type=='click') 'C' else if(fact_type=='impression') 'I' else if(fact_type=='activity') 'A' else NA),
            lookup=concat_ws("_", campaign_id, ad_id, site_id_dcm, placement_id),
            activity_lookup=as.character(activity_id),
            sv1=as.character(segment_value_1),
            other_data=as.character(other_data))  %>%
  mutate(time_char=as.character(from_unixtime(time)))

# cache to memory
dftbl <- sdf_register(dftbl, "filtereddf")
tbl_cache(sc, "filtereddf")

#__________________________________________________________________________________________________________________________________

# Collect out of spark
#__________________________________________________________________________________________________________________________________

myDF <- collect(dftbl)

【问题讨论】:

  • cache() 实际上不会强制进行任何计算,它只会将数据帧标记为待缓存。在您对数据框执行操作后,所有计算和缓存都将发生,例如count()first().
  • 关于这一点,您的代码是否在 SparklyR 之上进行了一些转换?喜欢 map 或 reduce 函数?或者您正在使用 DataFrame API?
  • 你如何执行你的应用程序以及你的目的是什么资源管理器(spark self/yarn)?
  • @Thiago Baldim 除了我上面提到的之外,没有任何转换。这些是用 Sparklyr 编写的,据我了解,它随后会转换为 Spark SQL。
  • @jay,作为一个调查练习,您能否做以下事情:不要将您的数据收集到驱动程序,而是先尝试将其写回 s3,然后检查实际数据量是多少(是的,在记忆中它会有所不同,但它会给你基本的感觉)

标签: apache-spark memory sparklyr


【解决方案1】:

如上所述,“缓存”不是动作,检查 RDD Persistence:

You can mark an RDD to be persisted using the persist() or cache() methods on it. The first time it is computed in an action, it will be kept in memory on the nodes. 

但是“collect”是一个动作,所有的计算(包括“cache”)都会在调用“collect”时开始。

您以独立模式运行应用程序,这意味着初始数据加载和所有计算将在同一内存中执行。

数据下载和其他计算占用了大部分内存,而不是“收集”。

您可以通过将“collect”替换为“count”来检查它。

【讨论】:

  • 如原始问题中所述,cache 的 Sparklyr 版本在桌子上执行 countcache [即count] 运行良好 - 只有当我随后调用 collect 时才会发生 OOM 错误。你能解释一下吗?
  • 问题标题看起来像关于 Spark,实际上它是关于单独的引擎 Sparklyr,它进行自己的“缓存/收集”计算。猜猜,这很混乱。
  • 不是单独的引擎,只是 Spark 的前端。但同意 Sparklyr 中的 cache 调用 Spark 中的 count 令人困惑。
  • 也许,“dataFrame.rdd.count”也会导致 OutOfMemory 而不是“collect”? *.com/questions/42714291/…
【解决方案2】:

当您在数据框上说收集时,会发生两件事,

  1. 首先必须将所有数据写入驱动程序的输出端。
  2. 驱动程序必须从所有节点收集数据并保存在其内存中。

答案:

如果您只想将数据加载到执行程序的内存中,count() 也是一个将数据加载到执行程序内存中的操作,可以被其他进程使用。

如果您想提取数据,请在提取数据“--conf spark.driver.maxResultSize=10g”时与其他属性一起尝试。

【讨论】:

  • 感谢您提供有关调用 collect 时会发生什么的信息。至于建议,我已经按照我原来的问题中的描述实现了这两个。