【问题标题】:Kafka Streams limiting off-heap memoryKafka Streams 限制堆外内存
【发布时间】:2021-01-20 17:06:55
【问题描述】:

我们正在运行 kafka 流应用程序,并且频率遇到堆外内存问题。我们的应用程序已部署,kubernetes POD 并不断重启。

我正在做一些调查,发现我们可以通过实现 RocksDBConfigSetter 来限制堆外内存,如下例所示。

public static class BoundedMemoryRocksDBConfig implements RocksDBConfigSetter {

  // See #1 below
  private static org.rocksdb.Cache cache = new org.rocksdb.LRUCache(TOTAL_OFF_HEAP_MEMORY, -1, false, INDEX_FILTER_BLOCK_RATIO);
  private static org.rocksdb.WriteBufferManager writeBufferManager = new org.rocksdb.WriteBufferManager(TOTAL_MEMTABLE_MEMORY, cache);

  @Override
  public void setConfig(final String storeName, final Options options, final Map<String, Object> configs) {

    BlockBasedTableConfig tableConfig = (BlockBasedTableConfig) options.tableFormatConfig();

    // These three options in combination will limit the memory used by RocksDB to the size passed to the block cache (TOTAL_OFF_HEAP_MEMORY)
    tableConfig.setBlockCache(cache);
    tableConfig.setCacheIndexAndFilterBlocks(true);
    options.setWriteBufferManager(writeBufferManager);

    // These options are recommended to be set when bounding the total memory
    // See #2 below
    tableConfig.setCacheIndexAndFilterBlocksWithHighPriority(true);
    tableConfig.setPinTopLevelIndexAndFilter(true);
    // See #3 below
    tableConfig.setBlockSize(BLOCK_SIZE);
    options.setMaxWriteBufferNumber(N_MEMTABLES);
    options.setWriteBufferSize(MEMTABLE_SIZE);

    options.setTableFormatConfig(tableConfig);
  }

  @Override
  public void close(final String storeName, final Options options) {
    // Cache and WriteBufferManager should not be closed here, as the same objects are shared by every store instance.
  }
}

在我们的应用程序中,我们有 6 个分区的输入主题,并且我们正在使用大约 40 个主题中的数据。 Out 应用程序只有 1 个拓扑,它从这些主题中消费,将数据存储在状态存储中(用于重复数据删除、查找和一些验证)。因此,据我了解,kafka 流应用程序将创建以下 RocksDB 实例,并且需要以下最大堆内存。如果我错了,请纠正我。

rocksdb 实例总数(假设每个任务都会创建自己的rocksdb 实例)

6(partitions) * 40(topics) -> 240 rocksdb instances

消耗的最大堆外内存

 240 * ( 50 (Block cache)  + 16*3(memcache) + filters(unknown))
- 240 * ~110 MB
- 26400 MB
- 25 GB

这似乎是一个很大的数字。计算是否正确?我知道实际上我们不应该达到这个最大数字,但计算是否正确?

另外,如果我们实现 RocksDBConfigSetter 并将最大堆外内存设置为 4 GB。如果rocksdb要求更多内存(因为它预计大约25 GB),应用程序会抱怨(崩溃OOM)吗?

更新: 我将 LRU 减少到 1GB,并且我的流应用程序开始抛出 LRU 完整异常

2021-02-07 23:20:47,443 15448195 [dp-Corrigo-67c5563a-9e3c-4d79-bc1e-23175e2cba6c-StreamThread-2] ERROR o.a.k.s.p.internals.StreamThread - stream-thread [dp-Corrigo-67c5563a-9e3c-4d79-bc1e-23175e2cba6c-StreamThread-2] Encountered the following exception during processing and the thread is going to shut down: 
org.apache.kafka.streams.errors.ProcessorStateException: stream-thread [dp-Corrigo-67c5563a-9e3c-4d79-bc1e-23175e2cba6c-StreamThread-2] task [29_4] Exception caught while trying to restore state from dp-Corrigo-InvTreeObject-Store-changelog-4
    at org.apache.kafka.streams.processor.internals.ProcessorStateManager.restore(ProcessorStateManager.java:425)
    at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restoreChangelog(StoreChangelogReader.java:562)
    at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:461)
    at org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:744)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:625)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:512)
Caused by: org.apache.kafka.streams.errors.ProcessorStateException: Error restoring batch to store InvTreeObject-Store
    at org.apache.kafka.streams.state.internals.RocksDBStore$RocksDBBatchingRestoreCallback.restoreAll(RocksDBStore.java:647)
    at org.apache.kafka.streams.processor.internals.StateRestoreCallbackAdapter.lambda$adapt$0(StateRestoreCallbackAdapter.java:42)
    at org.apache.kafka.streams.processor.internals.ProcessorStateManager.restore(ProcessorStateManager.java:422)
    ... 6 common frames omitted
Caused by: org.rocksdb.RocksDBException: Insert failed due to LRU cache being full.
    at org.rocksdb.RocksDB.write0(Native Method)
    at org.rocksdb.RocksDB.write(RocksDB.java:806)
    at org.apache.kafka.streams.state.internals.RocksDBStore.write(RocksDBStore.java:439)
    at org.apache.kafka.streams.state.internals.RocksDBStore$RocksDBBatchingRestoreCallback.restoreAll(RocksDBStore.java:645)
    ... 8 common frames omitted

【问题讨论】:

    标签: apache-kafka apache-kafka-streams rocksdb


    【解决方案1】:

    不确定您获得了多少 RocksDB 实例。这取决于您的程序的结构。您应该查看TopologyDescription(通过Topology#describe())。子拓扑被实例化为任务(基于分区的数量),每个任务都有自己的 RocksDB 来维护每个存储的整体状态的分片。

    我建议查看 Kafka 峰会演讲“为 Kafka Streams 的 State Store 调整 RocksDB 的性能”:https://videos.confluent.io/watch/Ud6dtEC3DMYEtmK3dMK5ci

    另外,如果我们实现 RocksDBConfigSetter 并将最大堆外内存设置为 4 GB。如果rocksdb要求更多内存(因为它预计大约25 GB),应用程序会抱怨(崩溃OOM)吗?

    它不会崩溃。 RocksDB 将溢出到磁盘。能够溢出到磁盘是我们默认使用持久状态存储(而不是内存状态存储)的原因。它允许保持大于主存的状态。当您使用 Kubernetes 时,您应该将相应的卷附加到您的容器并相应地调整它们的大小(参见 https://docs.confluent.io/platform/current/streams/sizing.html)。您可能还想观看 Kafka 峰会演讲“使用 Docker 和 Kubernetes 部署 Kafka Streams 应用程序”:https://www.confluent.io/kafka-summit-sf18/deploying-kafka-streams-applications/

    如果状态大于主内存,您可能还需要监控 RocksDB 指标,如果您遇到每个问题以相应地调整不同的“缓冲区”:https://docs.confluent.io/platform/current/streams/monitoring.html#rocksdb-metrics

    【讨论】:

    • 对于内存监控,请查看引入 RocksDB 指标的 KIP-607 (cwiki.apache.org/confluence/x/pRQRCQ)。
    • 关于您的计算,请参阅您发布的配置设置器中的评论:These three options in combination will limit the memory used by RocksDB to the size passed to the block cache (TOTAL_OFF_HEAP_MEMORY)。所以你不需要单独考虑 memtable 和过滤器。所有都将计入块缓存。
    • 我的一个假设是,如果缓存很小,那么它将写入磁盘,但似乎它开始抛出异常并导致流应用程序失败。更新问题
    • 不完全确定,但错误似乎发生在状态恢复期间:RocksDBBatchingRestoreCallback.restoreAll -- 在恢复期间,我们以不同方式配置 RocksDB,特别是我们以“批量加载”模式打开 RocksDB(至少对于大多数 KafkaStreams 版本...)。会不会有关系?
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2012-04-02
    • 2011-02-16
    • 2021-11-23
    • 2023-03-23
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多