【发布时间】:2022-01-01 01:08:30
【问题描述】:
我对 spark 很陌生,目前在 hadoop 2.6.5 设置上运行 spark 2.1.2 作为 t3.xlarge(16gb 内存)上的单个节点。一直在增加 spark.executor.memory -> 12g,spark.driver.maxResultSize -> 12g,spark.driver.memory -> 6g,但反复获得 GC 开销限制,可能是什么问题和建议?
Secondary qns:在这个单节点设置中,将更多的内存分配给 executor 还是 driver 更好?
Traceback (most recent call last):
File "/usr/local/lib/python2.7/site-packages/airflow/models/__init__.py", line 1441, in _run_raw_task
result = task_copy.execute(context=context)
File "/usr/local/lib/python2.7/site-packages/airflow/operators/python_operator.py", line 112, in execute
return_value = self.execute_callable()
File "/usr/local/lib/python2.7/site-packages/airflow/operators/python_operator.py", line 117, in execute_callable
return self.python_callable(*self.op_args, **self.op_kwargs)
File "/airflow/dags/fncore/tasks/graph_to_neo4j.py", line 431, in graph_to_neo4j
for edges in edges_result:
File "/airflow/dags/fncore/tasks/graph_to_neo4j.py", line 343, in get_transformed_edges
for dataframe in to_pandas_iterator(transformed, max_result_size=max_result_size):
File "/airflow/dags/fncore/tasks/graph_to_neo4j.py", line 111, in to_pandas_iterator
yield cur_dataframe.toPandas()
File "/opt/spark-2.1.2/python/pyspark/sql/dataframe.py", line 1585, in toPandas
return pd.DataFrame.from_records(self.collect(), columns=self.columns)
File "/opt/spark-2.1.2/python/pyspark/sql/dataframe.py", line 391, in collect
port = self._jdf.collectToPython()
File "/opt/spark-2.1.2/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "/opt/spark-2.1.2/python/pyspark/sql/utils.py", line 63, in deco
return f(*a, **kw)
File "/opt/spark-2.1.2/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value
format(target_id, ".", name), value)
Py4JJavaError: An error occurred while calling o2163.collectToPython.
An error occurred while calling o2163.collectToPython.
: java.lang.OutOfMemoryError: GC overhead limit exceeded
at java.util.zip.DeflaterOutputStream.<init>(DeflaterOutputStream.java:89)
at java.util.zip.GZIPOutputStream.<init>(GZIPOutputStream.java:90)
at java.util.zip.GZIPOutputStream.<init>(GZIPOutputStream.java:109)
at org.apache.hadoop.io.WritableUtils.writeCompressedByteArray(WritableUtils.java:64)
at org.apache.hadoop.io.WritableUtils.writeCompressedString(WritableUtils.java:94)
at org.apache.hadoop.io.WritableUtils.writeCompressedStringArray(WritableUtils.java:155)
at org.apache.hadoop.conf.Configuration.write(Configuration.java:2836)
at org.apache.spark.util.SerializableConfiguration$$anonfun$writeObject$1.apply$mcV$sp(SerializableConfiguration.scala:27)
at org.apache.spark.util.SerializableConfiguration$$anonfun$writeObject$1.apply(SerializableConfiguration.scala:25)
at org.apache.spark.util.SerializableConfiguration$$anonfun$writeObject$1.apply(SerializableConfiguration.scala:25)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1281)
at org.apache.spark.util.SerializableConfiguration.writeObject(SerializableConfiguration.scala:25)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1154)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$blockifyObject$2.apply(TorrentBroadcast.scala:272)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$blockifyObject$2.apply(TorrentBroadcast.scala:272)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1315)
at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:273)
at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:126)
at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:88)
at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:56)
at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1410)
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.buildReader(ParquetFileFormat.scala:343)
【问题讨论】:
-
这似乎是驱动程序上的一个错误,并且您在 Airflow 任务上运行它时遇到此错误。 Airflow Executor 是否有足够的资源?
-
executor只有1个,资源如上。如果有更多的执行者,它会改善吗?该实例只有 4 个 vCPU。
-
您在哪里运行气流?相同的 t3.xlarge 实例还是托管实例?我认为内存不足发生在运行气流任务(充当 Spark 驱动程序)的主机中,而不是在 hadoop 集群上
-
Airflow 在单独的 t3.medium 实例上作为 docker 容器运行?哦,嗯..这很有趣,但是我在 --master yarn-client 模式下提交了 spark 作业,那么驱动程序是否会驻留在容器中而不是 yarn 中,所以 spark.driver.memory 是否依赖于容器内存?在此之前,气流作业中还有其他较小的任务可以正常执行。
-
this 告诉你说的是对的,驱动程序驻留在气流任务中,因此在气流工作程序/调度程序中(在顺序或本地执行程序上)
标签: apache-spark pyspark hadoop-yarn