【问题标题】:Spark Graphframes large dataset and memory IssuesSpark Graphframes 大型数据集和内存问题
【发布时间】:2019-12-25 18:34:47
【问题描述】:

我想在 35 亿个节点和 900 亿条边的相对较大的图上运行 pagerank。我一直在尝试不同的集群大小来让它运行。但首先是代码:

from pyspark.sql import SparkSession
import graphframes

spark = SparkSession.builder.getOrCreate()

edges_DF = spark.read.parquet('s3://path/to/edges') # 1.4TB total size
verts_DF   = spark.read.parquet('s3://path/to/verts') # 25GB total size

graph_GDF = graphframes.GraphFrame(verts_DF, edges_DF)
graph_GDF = graph_GDF.dropIsolatedVertices()

result_df   = graph_GDF.pageRank(resetProbability=0.15, tol=0.1)
pagerank_df = result_df.vertices
pagerank_df.write.parquet('s3://path/to/output', mode='overwrite')

我从一开始就经历过严重的垃圾收集问题。所以我为集群尝试了不同的设置和大小。我主要关注了两篇文章:

https://databricks.com/blog/2015/05/28/tuning-java-garbage-collection-for-spark-applications.html

https://aws.amazon.com/blogs/big-data/best-practices-for-successfully-managing-memory-for-apache-spark-applications-on-amazon-emr/

我在亚马逊 EMR 上运行集群。这些是我目前使用的相关设置:

"spark.jars.packages": "org.apache.hadoop:hadoop-aws:2.7.6,graphframes:graphframes:0.7.0-spark2.4-s_2.11",
"spark.dynamicAllocation.enabled": "false",
"spark.network.timeout":"1600s",
"spark.executor.heartbeatInterval":"120s",
"spark.executor.extraJavaOptions": "-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError='kill -9 %p'",
"spark.driver.extraJavaOptions": "-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError='kill -9 %p'",
"spark.storage.level": "MEMORY_AND_DISK_SER",
"spark.rdd.compress": "true",
"spark.shuffle.compress": "true",
"spark.shuffle.spill.compress": "true",
"spark.memory.fraction": "0.80",
"spark.memory.storageFraction": "0.30",
"spark.serializer":"org.apache.spark.serializer.KryoSerializer",
"spark.sql.shuffle.partitions":"1216"

"yarn.nodemanager.vmem-check-enabled": "false",
"yarn.nodemanager.pmem-check-enabled": "false"

"maximizeResourceAllocation": "true"

"fs.s3.maxConnections": "5000",
"fs.s3.consistent": "true",
"fs.s3.consistent.throwExceptionOnInconsistency":"false",
"fs.s3.consistent.retryPolicyType":"fixed",
"fs.s3.consistent.retryPeriodSeconds":"10"

我尝试了集群大小我的第一个似乎有效的实验是 具有以下参数的集群:--deploy-mode cluster --num-executors 75 --executor-cores 5 --executor-memory 36g --driver-memory 36g --driver-cores 5

使用这种配置GC 时间已经过去了,一切正常,但由于这是一个测试集群,它的内存非常“小”,总共有2.7 TB,过了一会儿我得到了ExecutorLostFailure (executor 54 exited caused by one of the running tasks) Reason: Container from a bad node Exit status: 137.,我认为发生是因为我将node 留给了小内存。所以我重新运行了整个事情,但这次是--executor-cores 5 --executor-memory 35g 和我的GC 问题,我的集群在后面和我的集群表现得非常奇怪。所以我想我理解了这个问题,GC 次高的原因并不是每个执行程序的内存不足。

我启动的下一个集群使用以下参数:--deploy-mode cluster --num-executors 179 --executor-cores 5 --executor-memory 45g --driver-memory 45g --driver-cores 5

因此,与以前一样,每个执行程序的集群更大,内存更多。一切运行顺利,我通过ganglia 注意到,第一步大约是 ram 的5.5 TB

虽然我理解使用较少的可用内核并扩大每个执行程序的内存会使程序更快的问题,但我猜想这与 verts_DF 的大小约为 25gb 有关,这样它会适合每个执行器的内存并为计算留出空间(25GB * 179 几乎是 5.5TB)。

因此,我启动的下一个集群具有相同数量的节点,但我将执行器的大小调整为:--num-executors 119 --executor-cores 5 --executor-memory 75g

马上把所有的问题都回来了!高 GC 次集群通过 ganglia 挂起,我可以看到 RAM 已填满 9 个可用 TB 中的 8 个。我很困惑。 我回去再次启动--num-executors 179 --executor-cores 5 --executor-memory 45g 集群,幸运的是EMR 很容易做到这一点,因为我可以克隆它。但现在这种配置也不起作用。高GC 次集群立即达到使用内存的8TB

这里发生了什么?感觉就像我玩轮盘赌一样,有时相同的配置有效,有时却无效?

【问题讨论】:

  • 我知道已经有一段时间了,但我目前遇到了类似的问题...您是否找到了适合您需求的解决方案/emr 集群配置?
  • 我也尝试在 scala 中使用 graphx 执行此操作,结果相似。我认为问题在于 spark 中图形的构造,因为我观察到 spark 试图在图形中创建所有可能的三元组。现在,一个非常大的图中所有可能的三元组的数量显然是巨大的,填满了所有的内存。我刚刚实现了我自己的页面排名算法,它速度较慢但有效。

标签: apache-spark pyspark amazon-emr graphframes


【解决方案1】:

如果有人在一段时间后仍然偶然发现这一点,它意识到问题在于graphxgraphframes 加载图表的方式。两者都尝试生成它们正在加载的图的所有三元组,其中非常大的图在OOM 错误中解析,因为具有 35 亿个节点和 700 亿条边的图有很多该死的。 我通过在pyspark 中实现pagerank 编写了一个解决方案。它肯定不如scala 实现那么快,但它可以扩展并且不会遇到所描述的三元组问题。 我在github上发布了 https://github.com/thagorx/spark_pagerank

【讨论】:

    【解决方案2】:

    如果您运行的是带有 pyspark 和 graphframes 的独立版本,您可以通过执行以下命令来启动 pyspark REPL

    pyspark --driver-memory 2g --executor-memory 6g --packages graphframes:graphframes:0.7.0-spark2.4-s_2.11
    

    请务必根据最新发布的 Spark 版本适当更改 SPARK_VERSION 环境变量

    【讨论】:

      猜你喜欢
      • 2021-07-03
      • 2016-12-15
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-09-27
      • 1970-01-01
      • 2013-12-13
      • 1970-01-01
      相关资源
      最近更新 更多