【发布时间】:2018-08-30 09:25:31
【问题描述】:
我们正在使用由 DCOS 系统管理的 spark 2.0.2,该系统从 Kafka 1.0.0 消息传递服务获取数据并在 hdfs 系统中写入 parquet。 一切正常,但是当我们增加 Kafka 中的主题数量时,我们的 spark 执行器开始不断崩溃并出现 OOM 错误:
java.lang.OutOfMemoryError: Java heap space
at org.apache.parquet.column.values.dictionary.IntList.initSlab(IntList.java:90)
at org.apache.parquet.column.values.dictionary.IntList.<init>(IntList.java:86)
at org.apache.parquet.column.values.dictionary.DictionaryValuesWriter.<init>(DictionaryValuesWriter.java:93)
at org.apache.parquet.column.values.dictionary.DictionaryValuesWriter$PlainDoubleDictionaryValuesWriter.<init>(DictionaryValuesWriter.java:422)
at org.apache.parquet.column.ParquetProperties.dictionaryWriter(ParquetProperties.java:139)
at org.apache.parquet.column.ParquetProperties.dictWriterWithFallBack(ParquetProperties.java:178)
at org.apache.parquet.column.ParquetProperties.getValuesWriter(ParquetProperties.java:203)
at org.apache.parquet.column.impl.ColumnWriterV1.<init>(ColumnWriterV1.java:83)
at org.apache.parquet.column.impl.ColumnWriteStoreV1.newMemColumn(ColumnWriteStoreV1.java:68)
at org.apache.parquet.column.impl.ColumnWriteStoreV1.getColumnWriter(ColumnWriteStoreV1.java:56)
at org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.<init>(MessageColumnIO.java:183)
at org.apache.parquet.io.MessageColumnIO.getRecordWriter(MessageColumnIO.java:375)
at org.apache.parquet.hadoop.InternalParquetRecordWriter.initStore(InternalParquetRecordWriter.java:109)
at org.apache.parquet.hadoop.InternalParquetRecordWriter.<init>(InternalParquetRecordWriter.java:99)
at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:217)
at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:175)
at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:146)
at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:113)
at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:87)
at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:62)
at org.apache.parquet.avro.AvroParquetWriter.<init>(AvroParquetWriter.java:47)
at npm.parquet.ParquetMeasurementWriter.ensureOpenWriter(ParquetMeasurementWriter.java:91)
at npm.parquet.ParquetMeasurementWriter.write(ParquetMeasurementWriter.java:75)
at npm.ingestion.spark.StagingArea$Measurements.store(StagingArea.java:100)
at npm.ingestion.spark.StagingArea$StagingAreaStorage.store(StagingArea.java:80)
at npm.ingestion.spark.StagingArea.add(StagingArea.java:40)
at npm.ingestion.spark.Kafka2HDFSPM$SubsetProcessor.sendToStagingArea(Kafka2HDFSPM.java:207)
at npm.ingestion.spark.Kafka2HDFSPM$SubsetProcessor.consumeRecords(Kafka2HDFSPM.java:193)
at npm.ingestion.spark.Kafka2HDFSPM$SubsetProcessor.process(Kafka2HDFSPM.java:169)
at npm.ingestion.spark.Kafka2HDFSPM$FetchSubsetsAndStore.call(Kafka2HDFSPM.java:133)
at npm.ingestion.spark.Kafka2HDFSPM$FetchSubsetsAndStore.call(Kafka2HDFSPM.java:111)
at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:218)
18/03/20 18:41:13 ERROR [Executor task launch worker-0] SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker-0,5,main]
java.lang.OutOfMemoryError: Java heap space
at org.apache.parquet.column.values.dictionary.IntList.initSlab(IntList.java:90)
at org.apache.parquet.column.values.dictionary.IntList.<init>(IntList.java:86)
at org.apache.parquet.column.values.dictionary.DictionaryValuesWriter.<init>(DictionaryValuesWriter.java:93)
at org.apache.parquet.column.values.dictionary.DictionaryValuesWriter$PlainDoubleDictionaryValuesWriter.<init>(DictionaryValuesWriter.java:422)
at org.apache.parquet.column.ParquetProperties.dictionaryWriter(ParquetProperties.java:139)
at org.apache.parquet.column.ParquetProperties.dictWriterWithFallBack(ParquetProperties.java:178)
at org.apache.parquet.column.ParquetProperties.getValuesWriter(ParquetProperties.java:203)
at org.apache.parquet.column.impl.ColumnWriterV1.<init>(ColumnWriterV1.java:83)
at org.apache.parquet.column.impl.ColumnWriteStoreV1.newMemColumn(ColumnWriteStoreV1.java:68)
at org.apache.parquet.column.impl.ColumnWriteStoreV1.getColumnWriter(ColumnWriteStoreV1.java:56)
at org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.<init>(MessageColumnIO.java:183)
at org.apache.parquet.io.MessageColumnIO.getRecordWriter(MessageColumnIO.java:375)
at org.apache.parquet.hadoop.InternalParquetRecordWriter.initStore(InternalParquetRecordWriter.java:109)
at org.apache.parquet.hadoop.InternalParquetRecordWriter.<init>(InternalParquetRecordWriter.java:99)
at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:217)
at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:175)
at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:146)
at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:113)
at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:87)
at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:62)
at org.apache.parquet.avro.AvroParquetWriter.<init>(AvroParquetWriter.java:47)
at npm.parquet.ParquetMeasurementWriter.ensureOpenWriter(ParquetMeasurementWriter.java:91)
at npm.parquet.ParquetMeasurementWriter.write(ParquetMeasurementWriter.java:75)
at npm.ingestion.spark.StagingArea$Measurements.store(StagingArea.java:100)
at npm.ingestion.spark.StagingArea$StagingAreaStorage.store(StagingArea.java:80)
at npm.ingestion.spark.StagingArea.add(StagingArea.java:40)
at npm.ingestion.spark.Kafka2HDFSPM$SubsetProcessor.sendToStagingArea(Kafka2HDFSPM.java:207)
at npm.ingestion.spark.Kafka2HDFSPM$SubsetProcessor.consumeRecords(Kafka2HDFSPM.java:193)
at npm.ingestion.spark.Kafka2HDFSPM$SubsetProcessor.process(Kafka2HDFSPM.java:169)
at npm.ingestion.spark.Kafka2HDFSPM$FetchSubsetsAndStore.call(Kafka2HDFSPM.java:133)
at npm.ingestion.spark.Kafka2HDFSPM$FetchSubsetsAndStore.call(Kafka2HDFSPM.java:111)
at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:218)
我们尝试增加执行器的可用内存,查看代码,但没有发现任何错误。
另一个信息:我们在 spark 中使用 RDD。
有没有人遇到过类似的问题,已经解决了
【问题讨论】:
标签: java apache-spark apache-kafka mesos parquet