【发布时间】:2020-03-16 22:08:18
【问题描述】:
使用 Kafka Streaming 2.4 和 DSL API。
我正在进行有状态的流处理,它连接到具有 100 个分区的用户主题。应用程序还将具有默认分区的内部主题引用到用户主题。
观察到错误并最终关闭所有任务线程。
能否请您指点一下获取公式来计算所需的打开文件描述符?
public class CustomRocksDBConfig implements RocksDBConfigSetter {
private org.rocksdb.Cache cache = new org.rocksdb.LRUCache(2 * 1024L * 1024L * 1024L);
@Override
public void setConfig(final String storeName, final Options options, final Map<String, Object> configs) {
BlockBasedTableConfig tableConfig = (BlockBasedTableConfig) options.tableFormatConfig();
tableConfig.setBlockCache(cache);
tableConfig.setBlockCacheSize(1024L * 1024L * 1024L);
tableConfig.setBlockSize( 4 * 1024L);
tableConfig.setCacheIndexAndFilterBlocks(true);
options.setTableFormatConfig(tableConfig);
options.setMaxWriteBufferNumber(7);
options.setMinWriteBufferNumberToMerge(4);
options.setWriteBufferSize(25 * 1024L * 1024L);}
Caused by: org.rocksdb.RocksDBException: While open a file for appending: /data/directory/generator.1583280000000/002360.sst: Too many open files
at org.rocksdb.RocksDB.flush(Native Method)
at org.rocksdb.RocksDB.flush(RocksDB.java:2394)
at org.apache.kafka.streams.state.internals.RocksDBStore$SingleColumnFamilyAccessor.flush(RocksDBStore.java:581)
at org.apache.kafka.streams.state.internals.RocksDBStore.flush(RocksDBStore.java:384)
... 17 more
【问题讨论】:
-
请分享您的配置
ulimit -a, -
感谢您的回复。 nofile= 65535, nproc=163840, pipe buffer size=4096,socket buffer size=4096, sigpend=257587, stack size=10240, core file size=0,locked address space=64,nice=0,rtprio=0 rest is无限制或不受支持。 RockedDB 可以使用流式应用程序自定义配置,但计算所需打开文件描述符的公式是未知的。请让我知道您是否可以提出一些方向。
-
您可能需要增加操作系统的打开文件限制。 -- 估计所需文件描述符的数量非常困难,因为它取决于许多因素。如有疑问,您可能需要联系 RocksDB 社区。span>
-
@MatthiasJ.Sax - 感谢您的回复。使用默认配置和给定生成的密钥,kafka 流确实放入 MemTable 并立即创建 sst 以存储在 statestore 目录中。当所有 3 个 memtables 都满时,我的期望是冲洗。这种访问较少 sst 文件并限制打开或创建多个文件的优势