【发布时间】:2019-05-29 16:36:55
【问题描述】:
我的代码当前正在使用 InMemoryKeyValueStore,它避免了对磁盘或 kafka 的任何持久性。 我想使用rocksdb(Stores.persistentKeyValueStore),以便应用程序从磁盘重新加载状态。我正在尝试实现这一点,而且我对 Kafka 和流 API 非常陌生。将不胜感激有关如何进行更改的帮助,同时我仍在尝试理解内容。
我尝试在这里创建状态存储:
StoreBuilder<KeyValueStore<String, LinkedList<StoreItem>>> store =
Stores.<String, LinkedList<StoreItem>>keyValueStoreBuilder(Stores.persistentKeyValueStore(storeKey), Serdes.String(), valueSerde);
如何在流构建器中注册它?
使用 inMemoryKeyValueStore 的现有代码:
static StoreBuilder<KeyValueStore<String, LinkedList<StoreItem>>> makeStoreBuilder(
final String storeKey,
final Serde<LinkedList<StoreItem>> valueSerde,
final boolean loggingDisabled) {
final StoreBuilder<KeyValueStore<String, LinkedList<StoreItem>>> storeBuilder =
Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeKey), Serdes.String(), valueSerde);
return storeBuilder;
}
我需要确保流应用在每次重新启动时都不会丢失日志主题中的现有消息。
【问题讨论】:
-
My code is currently using an InMemoryKeyValueStore, which avoids any persistence to disk or to kafka.-- 不确定您所说的“或到 Kafka”是什么意思 -- 如果存储由 Kafka 集群中的更改日志主题支持,则内存中或持久性不会影响,但日志记录可以为这两种类型的存储启用或禁用,这取决于您是否希望存储具有容错性。请注意,持久存储不会使存储容错!
标签: apache-kafka-streams rocksdb