【发布时间】:2021-01-20 17:06:55
【问题描述】:
我们正在运行 kafka 流应用程序,并且频率遇到堆外内存问题。我们的应用程序已部署,kubernetes POD 并不断重启。
我正在做一些调查,发现我们可以通过实现 RocksDBConfigSetter 来限制堆外内存,如下例所示。
public static class BoundedMemoryRocksDBConfig implements RocksDBConfigSetter {
// See #1 below
private static org.rocksdb.Cache cache = new org.rocksdb.LRUCache(TOTAL_OFF_HEAP_MEMORY, -1, false, INDEX_FILTER_BLOCK_RATIO);
private static org.rocksdb.WriteBufferManager writeBufferManager = new org.rocksdb.WriteBufferManager(TOTAL_MEMTABLE_MEMORY, cache);
@Override
public void setConfig(final String storeName, final Options options, final Map<String, Object> configs) {
BlockBasedTableConfig tableConfig = (BlockBasedTableConfig) options.tableFormatConfig();
// These three options in combination will limit the memory used by RocksDB to the size passed to the block cache (TOTAL_OFF_HEAP_MEMORY)
tableConfig.setBlockCache(cache);
tableConfig.setCacheIndexAndFilterBlocks(true);
options.setWriteBufferManager(writeBufferManager);
// These options are recommended to be set when bounding the total memory
// See #2 below
tableConfig.setCacheIndexAndFilterBlocksWithHighPriority(true);
tableConfig.setPinTopLevelIndexAndFilter(true);
// See #3 below
tableConfig.setBlockSize(BLOCK_SIZE);
options.setMaxWriteBufferNumber(N_MEMTABLES);
options.setWriteBufferSize(MEMTABLE_SIZE);
options.setTableFormatConfig(tableConfig);
}
@Override
public void close(final String storeName, final Options options) {
// Cache and WriteBufferManager should not be closed here, as the same objects are shared by every store instance.
}
}
在我们的应用程序中,我们有 6 个分区的输入主题,并且我们正在使用大约 40 个主题中的数据。 Out 应用程序只有 1 个拓扑,它从这些主题中消费,将数据存储在状态存储中(用于重复数据删除、查找和一些验证)。因此,据我了解,kafka 流应用程序将创建以下 RocksDB 实例,并且需要以下最大堆内存。如果我错了,请纠正我。
rocksdb 实例总数(假设每个任务都会创建自己的rocksdb 实例)
6(partitions) * 40(topics) -> 240 rocksdb instances
消耗的最大堆外内存
240 * ( 50 (Block cache) + 16*3(memcache) + filters(unknown))
- 240 * ~110 MB
- 26400 MB
- 25 GB
这似乎是一个很大的数字。计算是否正确?我知道实际上我们不应该达到这个最大数字,但计算是否正确?
另外,如果我们实现 RocksDBConfigSetter 并将最大堆外内存设置为 4 GB。如果rocksdb要求更多内存(因为它预计大约25 GB),应用程序会抱怨(崩溃OOM)吗?
更新: 我将 LRU 减少到 1GB,并且我的流应用程序开始抛出 LRU 完整异常
2021-02-07 23:20:47,443 15448195 [dp-Corrigo-67c5563a-9e3c-4d79-bc1e-23175e2cba6c-StreamThread-2] ERROR o.a.k.s.p.internals.StreamThread - stream-thread [dp-Corrigo-67c5563a-9e3c-4d79-bc1e-23175e2cba6c-StreamThread-2] Encountered the following exception during processing and the thread is going to shut down:
org.apache.kafka.streams.errors.ProcessorStateException: stream-thread [dp-Corrigo-67c5563a-9e3c-4d79-bc1e-23175e2cba6c-StreamThread-2] task [29_4] Exception caught while trying to restore state from dp-Corrigo-InvTreeObject-Store-changelog-4
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.restore(ProcessorStateManager.java:425)
at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restoreChangelog(StoreChangelogReader.java:562)
at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:461)
at org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:744)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:625)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:512)
Caused by: org.apache.kafka.streams.errors.ProcessorStateException: Error restoring batch to store InvTreeObject-Store
at org.apache.kafka.streams.state.internals.RocksDBStore$RocksDBBatchingRestoreCallback.restoreAll(RocksDBStore.java:647)
at org.apache.kafka.streams.processor.internals.StateRestoreCallbackAdapter.lambda$adapt$0(StateRestoreCallbackAdapter.java:42)
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.restore(ProcessorStateManager.java:422)
... 6 common frames omitted
Caused by: org.rocksdb.RocksDBException: Insert failed due to LRU cache being full.
at org.rocksdb.RocksDB.write0(Native Method)
at org.rocksdb.RocksDB.write(RocksDB.java:806)
at org.apache.kafka.streams.state.internals.RocksDBStore.write(RocksDBStore.java:439)
at org.apache.kafka.streams.state.internals.RocksDBStore$RocksDBBatchingRestoreCallback.restoreAll(RocksDBStore.java:645)
... 8 common frames omitted
【问题讨论】:
标签: apache-kafka apache-kafka-streams rocksdb