【问题标题】:How can i set retention period for state stores created by Kafka streams如何为 Kafka 流创建的状态存储设置保留期
【发布时间】:2019-05-28 16:43:10
【问题描述】:

我正在使用 Streams DSL 并进行状态聚合(从一个主题读取数据,聚合并将数据写入另一个主题)。如何减少写入状态存储的数据的保留期?现在我的基础设施团队说数据在状态存储中保留了 5 年,我必须减少它。我可以设置数据保留多长时间的特定配置吗?

    KTable<Windowed<String>, JSONObject> kTable = filteredKstream
            .groupBy((key, value) -> getNewKey(value),
                    Grouped.with(Serdes.String(), new JSONObjectSerde()))
            .windowedBy(windows).aggregate(() -> {
                SampleData sampleData = new SampleData();
                return new JSONObject(mapperUtils.writeValueAsString(sampleData, mapper));
            } , (key, value, aggregate) -> {
                return getAggregateValue(aggregate, value);
            } , Materialized
                    .<String, JSONObject, WindowStore<Bytes, byte[]>> as(
                            "sample-store")
                    .withKeySerde(Serdes.String())
                    .withValueSerde(jsonSerde));

【问题讨论】:

  • @MatthiasJ.Sax 我看到你已经回答了类似的问题。所以只是想知道你是否对此有任何见解
  • 商店的默认保留时间是 1 天——不知道为什么您的团队声称它是 5 年......另外,@michael-g-noll 给出了答案。 (顺便说一句:标记不能以这种方式工作——我没有收到通知,但只是通过浏览找到了问题。)

标签: apache-kafka-streams


【解决方案1】:

您可以使用Materialized#withRetention() 设置窗口和会话存储的保留期限。

https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/Materialized.html#withRetention-java.time.Duration-

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-09-27
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多