【问题标题】:Kafka Streams: How to use persistentKeyValueStore to reload existing messages from disk?Kafka Streams:如何使用 persistentKeyValueStore 从磁盘重新加载现有消息?
【发布时间】:2019-05-29 16:36:55
【问题描述】:

我的代码当前正在使用 InMemoryKeyValueStore,它避免了对磁盘或 kafka 的任何持久性。 我想使用rocksdb(Stores.persistentKeyValueStore),以便应用程序从磁盘重新加载状态。我正在尝试实现这一点,而且我对 Kafka 和流 API 非常陌生。将不胜感激有关如何进行更改的帮助,同时我仍在尝试理解内容。

我尝试在这里创建状态存储:

StoreBuilder<KeyValueStore<String, LinkedList<StoreItem>>> store =
                Stores.<String, LinkedList<StoreItem>>keyValueStoreBuilder(Stores.persistentKeyValueStore(storeKey), Serdes.String(), valueSerde);

如何在流构建器中注册它?

使用 inMemoryKeyValueStore 的现有代码:

   static StoreBuilder<KeyValueStore<String, LinkedList<StoreItem>>> makeStoreBuilder(
            final String storeKey,
            final Serde<LinkedList<StoreItem>> valueSerde,
            final boolean loggingDisabled) {

        final StoreBuilder<KeyValueStore<String, LinkedList<StoreItem>>> storeBuilder =
                Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeKey), Serdes.String(), valueSerde);
        return storeBuilder;
    }

我需要确保流应用在每次重新启动时都不会丢失日志主题中的现有消息。

【问题讨论】:

  • My code is currently using an InMemoryKeyValueStore, which avoids any persistence to disk or to kafka. -- 不确定您所说的“或到 Kafka”是什么意思 -- 如果存储由 Kafka 集群中的更改日志主题支持,则内存中或持久性不会影响,但日志记录可以为这两种类型的存储启用或禁用,这取决于您是否希望存储具有容错性。请注意,持久存储不会使存储容错!

标签: apache-kafka-streams rocksdb


【解决方案1】:

如何在流构建器中注册它?

致电StreamsBuilder#addStateStore()

https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/StreamsBuilder.html#addStateStore-org.apache.kafka.streams.state.StoreBuilder-

请参阅StateStoresInTheDSLIntegrationTest https://github.com/confluentinc/kafka-streams-examples 了解端到端演示应用程序。

【讨论】:

    【解决方案2】:

    您以某种方式将持久存储用作内存存储。商店负责其余的工作,您无需担心加载数据等。您只需使用它即可。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-03-23
      • 1970-01-01
      • 1970-01-01
      • 2017-09-30
      • 2014-02-20
      • 2021-07-13
      相关资源
      最近更新 更多