【发布时间】:2019-05-28 16:43:10
【问题描述】:
我正在使用 Streams DSL 并进行状态聚合(从一个主题读取数据,聚合并将数据写入另一个主题)。如何减少写入状态存储的数据的保留期?现在我的基础设施团队说数据在状态存储中保留了 5 年,我必须减少它。我可以设置数据保留多长时间的特定配置吗?
KTable<Windowed<String>, JSONObject> kTable = filteredKstream
.groupBy((key, value) -> getNewKey(value),
Grouped.with(Serdes.String(), new JSONObjectSerde()))
.windowedBy(windows).aggregate(() -> {
SampleData sampleData = new SampleData();
return new JSONObject(mapperUtils.writeValueAsString(sampleData, mapper));
} , (key, value, aggregate) -> {
return getAggregateValue(aggregate, value);
} , Materialized
.<String, JSONObject, WindowStore<Bytes, byte[]>> as(
"sample-store")
.withKeySerde(Serdes.String())
.withValueSerde(jsonSerde));
【问题讨论】:
-
@MatthiasJ.Sax 我看到你已经回答了类似的问题。所以只是想知道你是否对此有任何见解
-
商店的默认保留时间是 1 天——不知道为什么您的团队声称它是 5 年......另外,@michael-g-noll 给出了答案。 (顺便说一句:标记不能以这种方式工作——我没有收到通知,但只是通过浏览找到了问题。)