【问题标题】:How does Spark Structured Streaming flush in-memory state when state data is no longer being checked?当不再检查状态数据时,Spark Structured Streaming 如何刷新内存状态?
【发布时间】:2018-10-28 00:43:29
【问题描述】:

我正在尝试使用Spark Structured Streaming(version 2.2.0) 构建会话化应用程序。

如果在更新模式下使用mapGroupWithState,我知道如果状态数据变大,执行程序将崩溃并出现 OOM 异常。因此,我必须使用GroupStateTimeout 选项来管理内存。 (参考How does Spark Structured Streaming handle in-memory state when state data is growing?

但是,如果特定键没有更多新的流数据,我无法检查状态是否已超时并准备好被删除。

例如,假设我有以下代码。

myDataset
  .groupByKey(_.key)
  .flatMapGroupsWithState(OutputMode.Update, GroupStateTimeout.EventTimeTimeout)(makeSession)

makeSession() 函数会检查状态是否超时并移除超时状态。

现在,假设键“foo”已经在内存中存储了一些状态,并且没有具有键“foo”的新数据流入应用程序。因此,makeSession() 不会处理键为“foo”的数据,并且不会检查存储的状态。这意味着,带有键“foo”的存储状态会保留在内存中。如果有很多类似“foo”的键,存储的状态不会被刷新,JVM会引发OOM异常。

我可能对mapGroupWithState有误解,但我怀疑我的OOM异常是由上述问题引起的。

如果我是正确的,这种情况的解决方案是什么? 我想刷新所有已超时且没有更多新流数据的存储状态。

有什么好的代码示例吗?

【问题讨论】:

    标签: apache-spark spark-structured-streaming


    【解决方案1】:

    现在,假设键“foo”已经在内存中存储了一些状态, 并且没有带有键“foo”的新数据流入应用程序。 因此,makeSession() 不会处理键为“foo”的数据 并且未检查存储的状态。

    这是不正确的。只要您有任何键的新数据,Spark 将确保每个批次都验证整个键集,并最后一次调用超时键。

    每次调用flat/mapGroupsWithState,我们都有:

    val outputIterator =
          updater.updateStateForKeysWithData(filteredIter) ++
          updater.updateStateForTimedOutKeys()
    

    这是updateStateForTimedOutKeys

    def updateStateForTimedOutKeys(): Iterator[InternalRow] = {
      if (isTimeoutEnabled) {
        val timeoutThreshold = timeoutConf match {
          case ProcessingTimeTimeout => batchTimestampMs.get
          case EventTimeTimeout => eventTimeWatermark.get
          case _ =>
            throw new IllegalStateException(
              s"Cannot filter timed out keys for $timeoutConf")
        }
        val timingOutKeys = store.filter { case (_, stateRow) =>
          val timeoutTimestamp = getTimeoutTimestamp(stateRow)
          timeoutTimestamp != NO_TIMESTAMP && timeoutTimestamp < timeoutThreshold
        }
        timingOutKeys.flatMap { case (keyRow, stateRow) =>
          callFunctionAndUpdateState(keyRow, Iterator.empty, Some(stateRow), hasTimedOut = true)
        }
      } else Iterator.empty
    }
    

    其中相关部​​分是 flatMap 超时键并使用hasTimedOut = true 最后一次调用每个函数。

    【讨论】:

    • 嗨,由于 spark 2.3.0 OOM HDFS 状态存储,我正在尝试使用 flatMapGroupsWithState 实现流到流连接。你用过这个 API 吗?你知道内部连接的任何实现吗?
    猜你喜欢
    • 2018-01-31
    • 2023-01-20
    • 2020-12-28
    • 2019-07-14
    • 2023-04-07
    • 2014-09-11
    • 2021-06-03
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多