【问题标题】:kafka-streams session windows retention periodkafka-streams 会话窗口保留期
【发布时间】:2018-04-17 12:11:32
【问题描述】:

我正在会话窗口上进行 POC,以满足我的流要求之一。为此,我使用会话窗口,因为这符合我必须将具有唯一事务 ID 的事件聚合到列表中的要求。每个事务可以有多个不同应用程序生成的事件,并将它们推送到 kafka。下面是代码

    StreamsConfig streamsConfig = new StreamsConfig(getProperties());
    Serde<String> stringSerde = Serdes.String();
    Serde<Transaction> transactionSerde = StreamsSerdes.TransactionSerde();

    Aggregator<String,Transaction, List<Transaction>> agg = (key, value, list)
            -> {
        list.add(value);
        return list;
    };

    Merger<String, List<Transaction>> merger = (key, v1, v2) ->
       Stream.concat(v1.stream(), v2.stream())
               .collect(Collectors.toList());

    Materialized<String,List<Transaction>,SessionStore<Bytes, byte[]>>
            materialized = Materialized.<String,List<Transaction>>as(Stores
            .persistentSessionStore
            ("trans-store", 1000 * 30)).withKeySerde(stringSerde).withValueSerde(StreamsSerdes
            .TransactionsListSerde());


    Initializer<List<Transaction>> init = () -> new ArrayList<>();

    StreamsBuilder builder = new StreamsBuilder();
    KTable<Windowed<String>, List<Transaction>> customerTransactionCounts =
             builder.stream(TRANSACTIONS_TOPIC, Consumed.with(stringSerde, transactionSerde).withOffsetResetPolicy(LATEST))
            .groupBy((noKey, transaction) -> transaction.getCustomerId(),
                    Serialized.with(stringSerde, transactionSerde))
            .windowedBy(SessionWindows.with(10000).until(1000 * 30))
                     .aggregate(init,agg,merger,materialized);


    customerTransactionCounts.toStream().print(Printed.<Windowed<String>, List<Transaction>>toSysOut()
            .withLabel("Customer Transactions List").withKeyValueMapper((key, list) ->
                    ("Current Time " + new Date().toString() + " Customer Id - " + key.key()  +
                    " START " +
                    new Date
                    (key.window().start()).toString() + " --- END " + new Date(key.window().end()).toString()+ "  " + list)));


    KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), streamsConfig);
    kafkaStreams.cleanUp();

保留期如何在这里工作?

1) 首先我摄取了一些交易 id X 的数据,事件日期范围为

2018 年 4 月 16 日星期一 22:25:40 EDT 开始 --- 2018 年 4 月 16 日星期一 22:25:49 EDT 结束

所有这些都在属于同一会话时进行了汇总。

2) 接下来我在 X 时间提取了单个记录事务 id START Mon Apr 16 22:26:45 EDT 2018。

我在提交间隔后看到了 1 条记录

据我了解,这里的直播时间改为22:26:45。此时,上述摄取的记录应从状态存储过期,为 endtime

3) 接下来,我摄取了与第一组事件处于同一时间范围的记录单记录事务 id X。我看到了第一步中的所有记录以及提交间隔后聚合结果中的新当前记录。

第一组记录不应该从状态存储中过期,因为它们已经过了保留期???

在第三步中,我假设我只会得到一个聚合记录,因为旧的记录应该已被删除。从状态存储中删除记录的保留期何时开始?

【问题讨论】:

    标签: apache-kafka-streams


    【解决方案1】:

    保留时间是一个“最小值”——为了更有效地过期,数据存储在所谓的段中(基于时间间隔),当段中的所有数据都超过保留时间时,段就会过期。

    【讨论】:

    • 谢谢@Matthias。如果有任何关于分段及其保留逻辑如何工作的文档,您可以发布链接吗?据我所知,段是每个时间间隔。我正在尝试复制保留清理行为,但无法获得它。假设段长度为 60 秒且段数为 3,那么如果状态存储获取任何不属于 3 个现有段中的任何一个的数据,那么它应该清理旧段并清除数据。 .这是正确的理解吗??
    • 没有文档...只有代码 :( - 你的理解是正确的:如果你有 3 个段,旧数据将被丢弃,如果新数据到达,则进入一个新段,最旧的段被清除。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2020-09-05
    • 1970-01-01
    • 2020-09-05
    • 1970-01-01
    • 1970-01-01
    • 2020-06-17
    • 1970-01-01
    相关资源
    最近更新 更多