【问题标题】:Spark. ~100 million rows. Size exceeds Integer.MAX_VALUE?火花。约 1 亿行。大小超过 Integer.MAX_VALUE?
【发布时间】:2017-10-03 15:32:58
【问题描述】:

(这是在小型三机 Amazon EMR 集群上运行 Spark 2.0)

我有一个 PySpark 作业,它将一些大型文本文件加载到 Spark RDD 中,count() 成功返回 158,598,155。

然后作业将每一行解析为一个 pyspark.sql.Row 实例,构建一个 DataFrame,然后再进行一次计数。 DataFrame 上的第二个 count() 导致 Spark 内部代码 Size exceeds Integer.MAX_VALUE 中的异常。这适用于较小的数据量。有人可以解释为什么/如何会发生这种情况吗?

org.apache.spark.SparkException: Job aborted due to stage failure: Task 22 in stage 1.0 failed 4 times, most recent failure: Lost task 22.3 in stage 1.0 (TID 77, ip-172-31-97-24.us-west-2.compute.internal): java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
    at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:869)
    at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:103)
    at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:91)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1287)
    at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:105)
    at org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:439)
    at org.apache.spark.storage.BlockManager.get(BlockManager.scala:604)
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:661)
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
    at org.apache.spark.scheduler.Task.run(Task.scala:85)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

PySpark 代码:

raw_rdd = spark_context.textFile(full_source_path)

# DEBUG: This call to count() is expensive
# This count succeeds and returns 158,598,155
logger.info("raw_rdd count = %d", raw_rdd.count())
logger.info("completed getting raw_rdd count!!!!!!!")

row_rdd = raw_rdd.map(row_parse_function).filter(bool)
data_frame = spark_sql_context.createDataFrame(row_rdd, MySchemaStructType)

data_frame.cache()
# This will trigger the Spark internal error
logger.info("row count = %d", data_frame.count())

【问题讨论】:

  • 第二个counts()的预期结果是什么?
  • 请分享出现错误的sn-p。
  • @gsamaras,基本和第一次算一样。

标签: apache-spark


【解决方案1】:

错误不是来自data_frame.count() 本身,而是因为通过row_parse_function 解析行会产生一些不适合MySchemaStructType 中指定整数类型的整数。

尝试将架构中的整数类型增加到 pyspark.sql.types.LongType(),或者让 spark 通过省略架构来推断类型(但这会减慢评估速度)。

【讨论】:

  • 现在,row_parse_function 肯定会检查超出范围的值。异常发生在 FileChannelImpl.map 中,超出范围的解析错误没有意义。
  • @clay 你能发帖row_parse_functionMySchemaStructType吗?
猜你喜欢
  • 2018-09-23
  • 1970-01-01
  • 2015-05-12
  • 2017-07-04
  • 1970-01-01
  • 2017-10-23
  • 2016-06-23
  • 2010-10-25
  • 1970-01-01
相关资源
最近更新 更多