【发布时间】:2018-10-28 00:43:29
【问题描述】:
我正在尝试使用Spark Structured Streaming(version 2.2.0) 构建会话化应用程序。
如果在更新模式下使用mapGroupWithState,我知道如果状态数据变大,执行程序将崩溃并出现 OOM 异常。因此,我必须使用GroupStateTimeout 选项来管理内存。
(参考How does Spark Structured Streaming handle in-memory state when state data is growing?)
但是,如果特定键没有更多新的流数据,我无法检查状态是否已超时并准备好被删除。
例如,假设我有以下代码。
myDataset
.groupByKey(_.key)
.flatMapGroupsWithState(OutputMode.Update, GroupStateTimeout.EventTimeTimeout)(makeSession)
makeSession() 函数会检查状态是否超时并移除超时状态。
现在,假设键“foo”已经在内存中存储了一些状态,并且没有具有键“foo”的新数据流入应用程序。因此,makeSession() 不会处理键为“foo”的数据,并且不会检查存储的状态。这意味着,带有键“foo”的存储状态会保留在内存中。如果有很多类似“foo”的键,存储的状态不会被刷新,JVM会引发OOM异常。
我可能对mapGroupWithState有误解,但我怀疑我的OOM异常是由上述问题引起的。
如果我是正确的,这种情况的解决方案是什么? 我想刷新所有已超时且没有更多新流数据的存储状态。
有什么好的代码示例吗?
【问题讨论】:
标签: apache-spark spark-structured-streaming