【发布时间】:2025-12-29 19:40:10
【问题描述】:
我知道关于 SO 有很多关于 Spark 内存不足错误的问题,但我还没有找到解决方案。
我有一个简单的工作流程:
- 从 Amazon S3 读入 ORC 文件
-
filter到一小部分行 -
select一小部分列 -
collect进入驱动节点(这样我就可以在R中做额外的操作)
当我运行上述程序然后 cache 运行表来触发内存时,它占用 collect 数据时出现 OOM 错误我的驱动节点。
我尝试在以下设置上运行:
- 具有 32 核和 244GB 内存的计算机上的本地模式
- 具有 10 个 6.2 GB 执行程序和 61 GB 驱动程序节点的独立模式
对于其中的每一个,我都使用了executor.memory、driver.memory 和driver.maxResultSize 的多种配置,以涵盖可用内存中的所有可能值,但我总是以内存不足错误告终在collect 阶段;任何一个
java.lang.OutOfMemoryError: Java heap space,java.lang.OutOfMemoryError : GC overhead limit exceeded, 或
Error in invoke_method.spark_shell_connection(spark_connection(jobj), :
No status is returned.(表示内存问题的sparklyr 错误)。
根据我对 Spark 的 [有限] 理解,在收集之前缓存表应该强制所有计算 - 即,如果表在缓存
请注意,this question 的回答有一些我尚未尝试的建议,但这些建议可能会影响性能(例如序列化 RDD),因此希望尽可能避免使用。
我的问题:
- 缓存后占用如此少空间的数据帧怎么会导致内存问题?
- 在我继续使用可能会影响性能的其他选项之前,我是否有明显的检查/更改/故障排除来帮助解决问题?
谢谢
编辑:注意以下@Shaido 的评论,通过Sparklyr 调用cache“通过在表格上执行count(*) 强制将数据加载到内存中”[来自Sparklyr 文档] - 即在调用collect 之前,表应该位于内存中并且所有计算都运行(我相信)。
编辑:遵循以下建议后的一些额外观察:
- 根据下面的 cmets,我现在尝试将数据写入 csv 而不是收集以了解可能的文件大小。此操作会创建一组 csvs,大小约为 3GB,缓存后运行仅需 2 秒。
- 如果我将
driver.maxResultSize设置为 - 如果我在调用
collect后在任务管理器中查看内存使用情况,我会看到使用情况一直在上升,直到达到 ~ 90GB,此时出现 OOM 错误。 因此,无论出于何种原因,用于执行collect操作的 RAM 量都比我尝试收集的 RDD 的大小大 100 倍左右。李>
编辑: 代码添加在下面,根据 cmets 中的要求。
#__________________________________________________________________________________________________________________________________
# Set parameters used for filtering rows
#__________________________________________________________________________________________________________________________________
firstDate <- '2017-07-01'
maxDate <- '2017-08-31'
advertiserID <- '4529611'
advertiserID2 <- '4601141'
advertiserID3 <- '4601141'
library(dplyr)
library(stringr)
library(sparklyr)
#__________________________________________________________________________________________________________________________________
# Configure & connect to spark
#__________________________________________________________________________________________________________________________________
Sys.setenv("SPARK_MEM"="100g")
Sys.setenv(HADOOP_HOME="C:/Users/Jay.Ruffell/AppData/Local/rstudio/spark/Cache/spark-2.0.1-bin-hadoop2.7/tmp/hadoop")
config <- spark_config()
config$sparklyr.defaultPackages <- "org.apache.hadoop:hadoop-aws:2.7.3" # used to connect to S3
Sys.setenv(AWS_ACCESS_KEY_ID="")
Sys.setenv(AWS_SECRET_ACCESS_KEY="") # setting these blank ensures that AWS uses the IAM roles associated with the cluster to define S3 permissions
# Specify memory parameters - have tried lots of different values here!
config$`sparklyr.shell.driver-memory` <- '50g'
config$`sparklyr.shell.executor-memory` <- '50g'
config$spark.driver.maxResultSize <- '50g'
sc <- spark_connect(master='local', config=config, version='2.0.1')
#__________________________________________________________________________________________________________________________________
# load data into spark from S3 ----
#__________________________________________________________________________________________________________________________________
#+++++++++++++++++++
# create spark table (not in memory yet) of all logfiles within logfiles path
#+++++++++++++++++++
spark_session(sc) %>%
invoke("read") %>%
invoke("format", "orc") %>%
invoke("load", 's3a://nz-omg-ann-aipl-data-lake/aip-connect-256537/orc-files/dcm-log-files/dt2-facts') %>%
invoke("createOrReplaceTempView", "alldatadf")
alldftbl <- tbl(sc, 'alldatadf') # create a reference to the sparkdf without loading into memory
#+++++++++++++++++++
# define variables used to filter table down to daterange
#+++++++++++++++++++
# Calculate firstDate & maxDate as unix timestamps
unixTime_firstDate <- as.numeric(as.POSIXct(firstDate))+1
unixTime_maxDate <- as.numeric(as.POSIXct(maxDate)) + 3600*24-1
# Convert daterange params into date_year, date_month & date_day values to pass to filter statement
dateRange <- as.character(seq(as.Date(firstDate), as.Date(maxDate), by=1))
years <- unique(substring(dateRange, first=1, last=4))
if(length(years)==1) years <- c(years, years)
year_y1 <- years[1]; year_y2 <- years[2]
months_y1 <- substring(dateRange[grepl(years[1], dateRange)], first=6, last=7)
minMonth_y1 <- min(months_y1)
maxMonth_y1 <- max(months_y1)
months_y2 <- substring(dateRange[grepl(years[2], dateRange)], first=6, last=7)
minMonth_y2 <- min(months_y2)
maxMonth_y2 <- max(months_y2)
# Repeat for 1 day prior to first date & one day after maxdate (because of the way logfile orc partitions are created, sometimes touchpoints can end up in the wrong folder by 1 day. So read in extra days, then filter by event time)
firstDateMinusOne <- as.Date(firstDate)-1
firstDateMinusOne_year <- substring(firstDateMinusOne, first=1, last=4)
firstDateMinusOne_month <- substring(firstDateMinusOne, first=6, last=7)
firstDateMinusOne_day <- substring(firstDateMinusOne, first=9, last=10)
maxDatePlusOne <- as.Date(maxDate)+1
maxDatePlusOne_year <- substring(maxDatePlusOne, first=1, last=4)
maxDatePlusOne_month <- substring(maxDatePlusOne, first=6, last=7)
maxDatePlusOne_day <- substring(maxDatePlusOne, first=9, last=10)
#+++++++++++++++++++
# Read in data, filter & select
#+++++++++++++++++++
# startTime <- proc.time()[3]
dftbl <- alldftbl %>% # create a reference to the sparkdf without loading into memory
# filter by month and year, using ORC partitions for extra speed
filter(((date_year==year_y1 & date_month>=minMonth_y1 & date_month<=maxMonth_y1) |
(date_year==year_y2 & date_month>=minMonth_y2 & date_month<=maxMonth_y2) |
(date_year==firstDateMinusOne_year & date_month==firstDateMinusOne_month & date_day==firstDateMinusOne_day) |
(date_year==maxDatePlusOne_year & date_month==maxDatePlusOne_month & date_day==maxDatePlusOne_day))) %>%
# filter to be within firstdate & maxdate. Note that event_time_char will be in UTC, so 12hrs behind.
filter(event_time>=(unixTime_firstDate*1000000) & event_time<(unixTime_maxDate*1000000)) %>%
# filter by advertiser ID
filter(((advertiser_id==advertiserID | advertiser_id==advertiserID2 | advertiser_id==advertiserID3) &
!is.na(advertiser_id)) |
((floodlight_configuration==advertiserID | floodlight_configuration==advertiserID2 |
floodlight_configuration==advertiserID3) & !is.na(floodlight_configuration)) & user_id!="0") %>%
# Define cols to keep
transmute(time=as.numeric(event_time/1000000),
user_id=as.character(user_id),
action_type=as.character(if(fact_type=='click') 'C' else if(fact_type=='impression') 'I' else if(fact_type=='activity') 'A' else NA),
lookup=concat_ws("_", campaign_id, ad_id, site_id_dcm, placement_id),
activity_lookup=as.character(activity_id),
sv1=as.character(segment_value_1),
other_data=as.character(other_data)) %>%
mutate(time_char=as.character(from_unixtime(time)))
# cache to memory
dftbl <- sdf_register(dftbl, "filtereddf")
tbl_cache(sc, "filtereddf")
#__________________________________________________________________________________________________________________________________
# Collect out of spark
#__________________________________________________________________________________________________________________________________
myDF <- collect(dftbl)
【问题讨论】:
-
cache()实际上不会强制进行任何计算,它只会将数据帧标记为待缓存。在您对数据框执行操作后,所有计算和缓存都将发生,例如count()或first(). -
关于这一点,您的代码是否在 SparklyR 之上进行了一些转换?喜欢 map 或 reduce 函数?或者您正在使用 DataFrame API?
-
你如何执行你的应用程序以及你的目的是什么资源管理器(spark self/yarn)?
-
@Thiago Baldim 除了我上面提到的之外,没有任何转换。这些是用 Sparklyr 编写的,据我了解,它随后会转换为 Spark SQL。
-
@jay,作为一个调查练习,您能否做以下事情:不要将您的数据收集到驱动程序,而是先尝试将其写回 s3,然后检查实际数据量是多少(是的,在记忆中它会有所不同,但它会给你基本的感觉)
标签: apache-spark memory sparklyr