【问题标题】:Flink consuming more memory than expectedFlink 消耗的内存比预期的要多
【发布时间】:2018-05-03 13:51:55
【问题描述】:

我正在使用Flink 1.4.1 处理事务事件,并使用 HDFS 存储检查点信息以实现容错。

创建了一个作业来汇总有关客户、星期几和每小时的信息,从而创建了一个配置文件,如下面的代码所示。

val stream = env.addSource(consumer)
val result = stream
  .map(openTransaction => {
    val transactionDate = openTransaction.get("transactionDate")
    val date = if (transactionDate.isTextual)
      LocalDateTime.parse(transactionDate.asText, DateTimeFormatter.ISO_DATE_TIME).toInstant(ZoneOffset.UTC).toEpochMilli
    else
      transactionDate.asLong
    (openTransaction.get("clientId").asLong, openTransaction.get("amount").asDouble, new Timestamp(date))
  })
  .keyBy(0)
  .window(SlidingEventWeekTimeWindows.of(Time.days(28), Time.hours(1)))
  .sum(1)

在上面的代码中,流包含三个字段:“transactionDate”、“clientId”和“amount”。我们通过 clientId 和一个滑动窗口来创建一个键控流。我们的数据库中有大约 100.000 个唯一的活动客户端 ID。

运行一段时间后,作业使用的总 RAM 稳定在 36 GB,但 HDFS 中存储的检查点仅使​​用 3 GB。有没有办法减少作业的 RAM 使用量,可能是通过配置 Flink 的复制因子或使用 RocksDB?

【问题讨论】:

    标签: scala hdfs apache-flink ram rocksdb


    【解决方案1】:

    对于这种状态大小,您绝对应该考虑使用 RocksDB,并且根据使用模式,可以有更小的检查点,因为它仅通过复制新的或更新的 SST 来增量执行它。

    一些要知道的事情,请记住:

    • 每个有状态算子并行子任务都会有自己的 RocksDB 实例。
    • 如果您确实切换到 RocksDB 进行检查点,并且它 开始运行比您需要的慢,请确保 您正在使用的序列化尽可能高效。
    • Flink 会根据您的后备文件系统提供一些 PredefinedOptions,请确保您选择得当
    • 如果预定义的选项不适合您,您可以覆盖 RocksDB 后端的 OptionsFactory 并微调各个 RocksDB 选项

    另一个需要注意的关于 Flink 中带有键控时间窗口的内存使用情况是,如果您要进入数十万或数百万,“计时器”可能会占用大量内存。 Flink 计时器是基于堆的(在撰写本文时),并且独立于您的状态后端同步检查点。

    【讨论】:

    • 我们已经尝试使用 RocksDb 和下面的行,但它不影响内存使用。我们预计它会在状态 > 内存时将状态存储在磁盘中,但它似乎只影响检查点存储。 val env = StreamExecutionEnvironment.getExecutionEnvironment()env.setStateBackend(new RocksDBStateBackend(filebackend, true))有没有我们遗漏的配置?
    • 添加了一条关于可能与您相关的计时器的注释。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2023-03-30
    • 1970-01-01
    • 1970-01-01
    • 2023-03-30
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多