【问题标题】:Retention time in kafka local state store / changelogkafka 本地状态存储/更改日志中的保留时间
【发布时间】:2019-07-08 04:21:43
【问题描述】:

我正在使用 Kafka 和 Kafka Streams 作为 Spring Cloud Stream 的一部分。在我的 Kafka Streams 应用程序中流动的数据正在按特定时间窗口进行聚合和具体化:

Materialized<String, ErrorScore, WindowStore<Bytes, byte[]>> oneHour = Materialized.as("one-hour-store");
    oneHour.withLoggingEnabled(topicConfig);
    events
            .map(getStringSensorMeasurementKeyValueKeyValueMapper())
            .groupByKey()
            .windowedBy(TimeWindows.of(oneHourStore.getTimeUnit()))
            .reduce((aggValue, newValue) -> getMaxErrorScore(aggValue, newValue),
                    (oneHour));

按照设计,正在具体化的信息也由变更日志主题支持。

我们的应用还有一个 rest 端点,它将像这样查询 statestore:

 ReadOnlyWindowStore<String, Double> windowStore =  queryableStoreRegistry.getQueryableStoreType("one-hour-store", QueryableStoreTypes.windowStore());
 WindowStoreIterator<ErrorScore> iter = windowStore.fetch(key, from, to);

查看创建的更改日志主题的设置,内容如下:

min.insync.replicas 1
cleanup.policy delete
retention.ms 5259600000
retention.bytes -1

我假设当地的州立商店至少会将信息保留 61 天(约 2 个月)。然而,商店中似乎只剩下最后一天的数据。

什么可能导致数据这么快被删除?

更新解决方案 Kafka Streams 2.0.1 版不包含 Materialized.withRetention 方法。对于这个特定版本,我可以使用以下代码设置状态存储的保留时间,从而解决我的问题:

TimeWindows timeWindows = TimeWindows.of(windowSizeMs);
    timeWindows.until(retentionMs);

让我的代码写成这样:

...

.groupByKey()
        .windowedBy(timeWindows)
        .reduce((aggValue, newValue) -> getMaxErrorScore(aggValue, newValue),
                (oneHour));
...

【问题讨论】:

    标签: apache-kafka apache-kafka-streams


    【解决方案1】:

    对于窗口化的KTables,有一个本地保留时间,还有一个 changlog 保留时间。您可以通过Materialized.withRetentionTime(...) 设置本地存储保留时间 -- 默认值为 24 小时。

    对于较旧的 Kafka 版本,本地存储保留时间通过 Windows#until() 设置。

    如果创建新应用程序,则会创建更改日志主题,保留时间与本地存储保留时间相同。但是,如果您手动增加日志保留时间,这不会影响您的商店保留时间,但您需要相应地更新您的代码。当变更日志主题已经存在时也是如此:如果您更改本地存储保留时间,则变更日志主题配置不会自动更新。

    也有一个 Jira:https://issues.apache.org/jira/browse/KAFKA-7591

    【讨论】:

    • Materialized 似乎没有 withRetentionTime。它似乎只从 2.1 开始可用,我们使用的是 2.0.1,因为 spring cloud stream。在早期版本中应该如何解决这个问题?还有一个集成测试可以测试这个吗?
    • 对于 2.0.1 版本的 Kafka Streams,我已经能够使用 TimeWindows.until() 配置本地状态存储的保留。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-01-28
    相关资源
    最近更新 更多