【发布时间】:2018-08-19 07:09:05
【问题描述】:
我在 Spark 2.2.0 中使用聚合和分区运行结构化流时遇到内存问题:
session
.readStream()
.schema(inputSchema)
.option(OPTION_KEY_DELIMITER, OPTION_VALUE_DELIMITER_TAB)
.option(OPTION_KEY_QUOTE, OPTION_VALUE_QUOTATION_OFF)
.csv("s3://test-bucket/input")
.as(Encoders.bean(TestRecord.class))
.flatMap(mf, Encoders.bean(TestRecord.class))
.dropDuplicates("testId", "testName")
.withColumn("year", functions.date_format(dataset.col("testTimestamp").cast(DataTypes.DateType), "YYYY"))
.writeStream()
.option("path", "s3://test-bucket/output")
.option("checkpointLocation", "s3://test-bucket/checkpoint")
.trigger(Trigger.ProcessingTime(60, TimeUnit.SECONDS))
.partitionBy("year")
.format("parquet")
.outputMode(OutputMode.Append())
.queryName("test-stream")
.start();
在测试过程中,我注意到每次新数据到来时使用的内存量都会增加,最终执行程序以代码 137 退出:
ExecutorLostFailure (executor 2 exited caused by one of the running tasks) Reason: Container marked as failed: container_1520214726510_0001_01_000003 on host: ip-10-0-1-153.us-west-2.compute.internal. Exit status: 137. Diagnostics: Container killed on request. Exit code is 137
Container exited with a non-zero exit code 137
Killed by external signal
我创建了一个堆转储,发现org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider 使用的大部分内存是从StateStore 引用的
乍一看,它看起来很正常,因为这就是 Spark 将聚合键保存在内存中的方式。但是,我通过重命名源文件夹中的文件进行了测试,以便它们可以被 spark 拾取。由于输入记录相同,因此所有其他行都应作为重复项而被拒绝,并且内存消耗不应增加,但确实如此。
此外,GC 时间占总处理时间的 30% 以上
这是一个堆转储,它取自执行程序,其运行的内存量比上面屏幕上的要少,因为当我从该执行程序创建转储时,java 进程刚刚在进程中间终止。
【问题讨论】:
标签: apache-spark apache-spark-sql spark-structured-streaming