【发布时间】:2021-04-07 20:27:35
【问题描述】:
我只是对流聚合函数中状态存储主题的保留时间(retention.ms)的计算感到困惑。
这是构建拓扑的流配置:
Properties props = new Properties();
props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "appId1");
props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.setProperty(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG, String.valueOf(5));
props.put(StreamsConfig.topicPrefix(TopicConfig.RETENTION_MS_CONFIG), 200);
额外保留时间设置为 5
主题保留设置为 200
假设我有这个用于创建拓扑的代码:(为了简单起见,我没有在聚合函数中做任何事情,我只是想说明要构建的存储主题)
不开窗
//topology1
final StreamsBuilder streamsBuilder = new StreamsBuilder();
KStream<String, String> source = streamsBuilder.stream("test-topic");
source.groupByKey().aggregate(() -> "1", (key, value, aggregate) -> "2");
Topology1 将使用 retention.ms=200
创建状态存储主题
这里的保留是 200(默认主题保留)
窗口:
//topology2
final StreamsBuilder streamsBuilder = new StreamsBuilder();
KStream<String, String> source = streamsBuilder.stream("test-topic");
source.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMillis(10)).grace(Duration.ofMillis(1)))
.aggregate(() -> "1", (key, value, aggregate) -> "2");
Topology2 将使用 retention.ms=86400005
创建状态存储主题
这里留存是 86400000 (???) + 5(额外留存)
通过设置保留窗口:
//topology3
final StreamsBuilder streamsBuilder = new StreamsBuilder();
KStream<String, String> source = streamsBuilder.stream("test-topic");
source.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMillis(10)).grace(Duration.ofMillis(1)))
.aggregate(() -> "1", (key, value, aggregate) -> "2"
, Materialized.<String, String, WindowStore<Bytes, byte[]>>as("agg")
.withRetention(Duration.ofMillis(300)));
Topology3 将使用 retention.ms=305
创建状态存储主题
这里的留存是 300(显式留存集)+ 5(附加留存)
要运行应用程序,还需要此代码:
Topology topology = streamsBuilder.build();
KafkaStreams streams = new KafkaStreams(topology, props);
streams.start();
那么,在拓扑状态存储主题中设置主题保留的逻辑是什么?
为什么在 Topology1 中不使用额外的保留?
为什么在 Topology2 中,不使用默认主题保留?
在Topology2中,什么参数是86400000加上额外保留相加?
最后,Topology1 中不使用附加保留,但在明确设置保留的 Topology3 中使用。
谁能解释一下这些背后的逻辑?
【问题讨论】:
-
我认为,当您的窗口时间小于保留时间时,您的窗口将仅根据保留时间收集值。也就是说,窗口时间必须大于保留时间,因为保留时间就是KTable发出数据的时间。
标签: java apache-kafka apache-kafka-streams spring-kafka