【问题标题】:Spark on mesos Executors failing with OOM ErrorsSpark on mesos Executors 因 OOM 错误而失败
【发布时间】:2018-08-30 09:25:31
【问题描述】:

我们正在使用由 DCOS 系统管理的 spark 2.0.2,该系统从 Kafka 1.0.0 消息传递服务获取数据并在 hdfs 系统中写入 parquet。 一切正常,但是当我们增加 Kafka 中的主题数量时,我们的 spark 执行器开始不断崩溃并出现 OOM 错误:

    java.lang.OutOfMemoryError: Java heap space
    at org.apache.parquet.column.values.dictionary.IntList.initSlab(IntList.java:90)
    at org.apache.parquet.column.values.dictionary.IntList.<init>(IntList.java:86)
    at org.apache.parquet.column.values.dictionary.DictionaryValuesWriter.<init>(DictionaryValuesWriter.java:93)
    at org.apache.parquet.column.values.dictionary.DictionaryValuesWriter$PlainDoubleDictionaryValuesWriter.<init>(DictionaryValuesWriter.java:422)
    at org.apache.parquet.column.ParquetProperties.dictionaryWriter(ParquetProperties.java:139)
    at org.apache.parquet.column.ParquetProperties.dictWriterWithFallBack(ParquetProperties.java:178)
    at org.apache.parquet.column.ParquetProperties.getValuesWriter(ParquetProperties.java:203)
    at org.apache.parquet.column.impl.ColumnWriterV1.<init>(ColumnWriterV1.java:83)
    at org.apache.parquet.column.impl.ColumnWriteStoreV1.newMemColumn(ColumnWriteStoreV1.java:68)
    at org.apache.parquet.column.impl.ColumnWriteStoreV1.getColumnWriter(ColumnWriteStoreV1.java:56)
    at org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.<init>(MessageColumnIO.java:183)
    at org.apache.parquet.io.MessageColumnIO.getRecordWriter(MessageColumnIO.java:375)
    at org.apache.parquet.hadoop.InternalParquetRecordWriter.initStore(InternalParquetRecordWriter.java:109)
    at org.apache.parquet.hadoop.InternalParquetRecordWriter.<init>(InternalParquetRecordWriter.java:99)
    at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:217)
    at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:175)
    at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:146)
    at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:113)
    at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:87)
    at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:62)
    at org.apache.parquet.avro.AvroParquetWriter.<init>(AvroParquetWriter.java:47)
    at npm.parquet.ParquetMeasurementWriter.ensureOpenWriter(ParquetMeasurementWriter.java:91)
    at npm.parquet.ParquetMeasurementWriter.write(ParquetMeasurementWriter.java:75)
    at npm.ingestion.spark.StagingArea$Measurements.store(StagingArea.java:100)
    at npm.ingestion.spark.StagingArea$StagingAreaStorage.store(StagingArea.java:80)
    at npm.ingestion.spark.StagingArea.add(StagingArea.java:40)
    at npm.ingestion.spark.Kafka2HDFSPM$SubsetProcessor.sendToStagingArea(Kafka2HDFSPM.java:207)
    at npm.ingestion.spark.Kafka2HDFSPM$SubsetProcessor.consumeRecords(Kafka2HDFSPM.java:193)
    at npm.ingestion.spark.Kafka2HDFSPM$SubsetProcessor.process(Kafka2HDFSPM.java:169)
    at npm.ingestion.spark.Kafka2HDFSPM$FetchSubsetsAndStore.call(Kafka2HDFSPM.java:133)
    at npm.ingestion.spark.Kafka2HDFSPM$FetchSubsetsAndStore.call(Kafka2HDFSPM.java:111)
    at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:218)
18/03/20 18:41:13 ERROR [Executor task launch worker-0] SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker-0,5,main]
java.lang.OutOfMemoryError: Java heap space
    at org.apache.parquet.column.values.dictionary.IntList.initSlab(IntList.java:90)
    at org.apache.parquet.column.values.dictionary.IntList.<init>(IntList.java:86)
    at org.apache.parquet.column.values.dictionary.DictionaryValuesWriter.<init>(DictionaryValuesWriter.java:93)
    at org.apache.parquet.column.values.dictionary.DictionaryValuesWriter$PlainDoubleDictionaryValuesWriter.<init>(DictionaryValuesWriter.java:422)
    at org.apache.parquet.column.ParquetProperties.dictionaryWriter(ParquetProperties.java:139)
    at org.apache.parquet.column.ParquetProperties.dictWriterWithFallBack(ParquetProperties.java:178)
    at org.apache.parquet.column.ParquetProperties.getValuesWriter(ParquetProperties.java:203)
    at org.apache.parquet.column.impl.ColumnWriterV1.<init>(ColumnWriterV1.java:83)
    at org.apache.parquet.column.impl.ColumnWriteStoreV1.newMemColumn(ColumnWriteStoreV1.java:68)
    at org.apache.parquet.column.impl.ColumnWriteStoreV1.getColumnWriter(ColumnWriteStoreV1.java:56)
    at org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.<init>(MessageColumnIO.java:183)
    at org.apache.parquet.io.MessageColumnIO.getRecordWriter(MessageColumnIO.java:375)
    at org.apache.parquet.hadoop.InternalParquetRecordWriter.initStore(InternalParquetRecordWriter.java:109)
    at org.apache.parquet.hadoop.InternalParquetRecordWriter.<init>(InternalParquetRecordWriter.java:99)
    at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:217)
    at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:175)
    at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:146)
    at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:113)
    at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:87)
    at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:62)
    at org.apache.parquet.avro.AvroParquetWriter.<init>(AvroParquetWriter.java:47)
    at npm.parquet.ParquetMeasurementWriter.ensureOpenWriter(ParquetMeasurementWriter.java:91)
    at npm.parquet.ParquetMeasurementWriter.write(ParquetMeasurementWriter.java:75)
    at npm.ingestion.spark.StagingArea$Measurements.store(StagingArea.java:100)
    at npm.ingestion.spark.StagingArea$StagingAreaStorage.store(StagingArea.java:80)
    at npm.ingestion.spark.StagingArea.add(StagingArea.java:40)
    at npm.ingestion.spark.Kafka2HDFSPM$SubsetProcessor.sendToStagingArea(Kafka2HDFSPM.java:207)
    at npm.ingestion.spark.Kafka2HDFSPM$SubsetProcessor.consumeRecords(Kafka2HDFSPM.java:193)
    at npm.ingestion.spark.Kafka2HDFSPM$SubsetProcessor.process(Kafka2HDFSPM.java:169)
    at npm.ingestion.spark.Kafka2HDFSPM$FetchSubsetsAndStore.call(Kafka2HDFSPM.java:133)
    at npm.ingestion.spark.Kafka2HDFSPM$FetchSubsetsAndStore.call(Kafka2HDFSPM.java:111)
    at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:218)

我们尝试增加执行器的可用内存,查看代码,但没有发现任何错误。

另一个信息:我们在 spark 中使用 RDD。

有没有人遇到过类似的问题,已经解决了

【问题讨论】:

    标签: java apache-spark apache-kafka mesos parquet


    【解决方案1】:

    执行器的堆配置是什么?默认情况下,Java 将根据机器内存自动调整其堆。您需要使用-Xmx setting 将其更改为适合您的容器。

    查看这篇关于运行Java in the container的文章

    https://github.com/fabianenardon/docker-java-issues-demo/tree/master/memory-sample

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2016-06-25
      • 1970-01-01
      • 2017-08-13
      • 1970-01-01
      • 2019-07-01
      • 2014-08-09
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多