【问题标题】:How does retention time is applied in kafka aggregate state store topics?保留时间如何应用于 kafka 聚合状态存储主题?
【发布时间】:2021-04-07 20:27:35
【问题描述】:

我只是对流聚合函数中状态存储主题的保留时间(retention.ms)的计算感到困惑。

这是构建拓扑的流配置:

    Properties props = new Properties();
    props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "appId1");
    props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
    props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    props.setProperty(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG, String.valueOf(5));

    props.put(StreamsConfig.topicPrefix(TopicConfig.RETENTION_MS_CONFIG), 200);

额外保留时间设置为 5
主题保留设置为 200

假设我有这个用于创建拓扑的代码:(为了简单起见,我没有在聚合函数中做任何事情,我只是想说明要构建的存储主题)
不开窗

    //topology1
    final StreamsBuilder streamsBuilder = new StreamsBuilder();
    KStream<String, String> source = streamsBuilder.stream("test-topic");
    source.groupByKey().aggregate(() -> "1",  (key, value, aggregate) -> "2");

Topology1 将使用 retention.ms=200
创建状态存储主题 这里的保留是 200(默认主题保留)

窗口:

    //topology2
    final StreamsBuilder streamsBuilder = new StreamsBuilder();
    KStream<String, String> source = streamsBuilder.stream("test-topic");
    source.groupByKey()
            .windowedBy(TimeWindows.of(Duration.ofMillis(10)).grace(Duration.ofMillis(1)))
            .aggregate(() -> "1", (key, value, aggregate) -> "2");

Topology2 将使用 retention.ms=86400005
创建状态存储主题 这里留存是 86400000 (???) + 5(额外留存)

通过设置保留窗口:

    //topology3
    final StreamsBuilder streamsBuilder = new StreamsBuilder();
    KStream<String, String> source = streamsBuilder.stream("test-topic");
    source.groupByKey()
            .windowedBy(TimeWindows.of(Duration.ofMillis(10)).grace(Duration.ofMillis(1)))
            .aggregate(() -> "1",  (key, value, aggregate) -> "2"
                    , Materialized.<String, String, WindowStore<Bytes, byte[]>>as("agg")
                            .withRetention(Duration.ofMillis(300)));

Topology3 将使用 retention.ms=305
创建状态存储主题 这里的留存是 300(显式留存集)+ 5(附加留存)

要运行应用程序,还需要此代码:

    Topology topology = streamsBuilder.build();
    KafkaStreams streams = new KafkaStreams(topology, props);
    streams.start();   

那么,在拓扑状态存储主题中设置主题保留的逻辑是什么?
为什么在 Topology1 中不使用额外的保留?
为什么在 Topology2 中,不使用默认主题保留?
在Topology2中,什么参数是86400000加上额外保留相加?
最后,Topology1 中不使用附加保留,但在明确设置保留的 Topology3 中使用。

谁能解释一下这些背后的逻辑?

【问题讨论】:

  • 我认为,当您的窗口时间小于保留时间时,您的窗口将仅根据保留时间收集值。也就是说,窗口时间必须大于保留时间,因为保留时间就是KTable发出数据的时间。

标签: java apache-kafka apache-kafka-streams spring-kafka


【解决方案1】:

对于窗口化的 KStreams,有一个默认值为 24 小时或 86400000 毫秒的本地保留时间。这就是 Topology2 保留时间背后的原因。

您可以通过Materialized.withRetentionTime(...)设置本地存储保留时间,这就是Topology3保留时间为305ms的原因

【讨论】:

  • 这实际上是我描述的。 local retention time 参数是什么,如何手动设置?
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2019-07-08
  • 1970-01-01
  • 1970-01-01
  • 2017-12-18
  • 2021-09-27
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多