【问题标题】:Memory issue with spark structured streamingspark结构化流的内存问题
【发布时间】:2018-08-19 07:09:05
【问题描述】:

我在 Spark 2.2.0 中使用聚合和分区运行结构化流时遇到内存问题:

session
    .readStream()
    .schema(inputSchema)
    .option(OPTION_KEY_DELIMITER, OPTION_VALUE_DELIMITER_TAB)
    .option(OPTION_KEY_QUOTE, OPTION_VALUE_QUOTATION_OFF)
    .csv("s3://test-bucket/input")
    .as(Encoders.bean(TestRecord.class))
    .flatMap(mf, Encoders.bean(TestRecord.class))
    .dropDuplicates("testId", "testName")
    .withColumn("year", functions.date_format(dataset.col("testTimestamp").cast(DataTypes.DateType), "YYYY"))
    .writeStream()
    .option("path", "s3://test-bucket/output")
    .option("checkpointLocation", "s3://test-bucket/checkpoint")
    .trigger(Trigger.ProcessingTime(60, TimeUnit.SECONDS))
    .partitionBy("year")
    .format("parquet")
    .outputMode(OutputMode.Append())
    .queryName("test-stream")
    .start();

在测试过程中,我注意到每次新数据到来时使用的内存量都会增加,最终执行程序以代码 137 退出:

ExecutorLostFailure (executor 2 exited caused by one of the running tasks) Reason: Container marked as failed: container_1520214726510_0001_01_000003 on host: ip-10-0-1-153.us-west-2.compute.internal. Exit status: 137. Diagnostics: Container killed on request. Exit code is 137
Container exited with a non-zero exit code 137
Killed by external signal

我创建了一个堆转储,发现org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider 使用的大部分内存是从StateStore 引用的

乍一看,它看起来很正常,因为这就是 Spark 将聚合键保存在内存中的方式。但是,我通过重命名源文件夹中的文件进行了测试,以便它们可以被 spark 拾取。由于输入记录相同,因此所有其他行都应作为重复项而被拒绝,并且内存消耗不应增加,但确实如此。

此外,GC 时间占总处理时间的 30% 以上

这是一个堆转储,它取自执行程序,其运行的内存量比上面屏幕上的要少,因为当我从该执行程序创建转储时,java 进程刚刚在进程中间终止。

【问题讨论】:

    标签: apache-spark apache-spark-sql spark-structured-streaming


    【解决方案1】:

    迁移我对SPARK-23682 的评论,该问题的提问者也提出了问题。

    在 HDFS 状态存储提供程序中,它在内存中过度缓存了多个版本的状态,默认 100 个版本。这个问题由SPARK-24717 解决,它只会在内存中维护两个版本的状态(当前用于重播,新用于更新)。该补丁将在 Spark 2.4.0 中提供。

    【讨论】:

      【解决方案2】:

      我认为根本原因是您没有将水印与 dropDuplicates 一起使用,因此所有状态都被保留并且永远不会被丢弃。 看看:https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#streaming-deduplication

      【讨论】: