【发布时间】:2018-04-17 12:11:32
【问题描述】:
我正在会话窗口上进行 POC,以满足我的流要求之一。为此,我使用会话窗口,因为这符合我必须将具有唯一事务 ID 的事件聚合到列表中的要求。每个事务可以有多个不同应用程序生成的事件,并将它们推送到 kafka。下面是代码
StreamsConfig streamsConfig = new StreamsConfig(getProperties());
Serde<String> stringSerde = Serdes.String();
Serde<Transaction> transactionSerde = StreamsSerdes.TransactionSerde();
Aggregator<String,Transaction, List<Transaction>> agg = (key, value, list)
-> {
list.add(value);
return list;
};
Merger<String, List<Transaction>> merger = (key, v1, v2) ->
Stream.concat(v1.stream(), v2.stream())
.collect(Collectors.toList());
Materialized<String,List<Transaction>,SessionStore<Bytes, byte[]>>
materialized = Materialized.<String,List<Transaction>>as(Stores
.persistentSessionStore
("trans-store", 1000 * 30)).withKeySerde(stringSerde).withValueSerde(StreamsSerdes
.TransactionsListSerde());
Initializer<List<Transaction>> init = () -> new ArrayList<>();
StreamsBuilder builder = new StreamsBuilder();
KTable<Windowed<String>, List<Transaction>> customerTransactionCounts =
builder.stream(TRANSACTIONS_TOPIC, Consumed.with(stringSerde, transactionSerde).withOffsetResetPolicy(LATEST))
.groupBy((noKey, transaction) -> transaction.getCustomerId(),
Serialized.with(stringSerde, transactionSerde))
.windowedBy(SessionWindows.with(10000).until(1000 * 30))
.aggregate(init,agg,merger,materialized);
customerTransactionCounts.toStream().print(Printed.<Windowed<String>, List<Transaction>>toSysOut()
.withLabel("Customer Transactions List").withKeyValueMapper((key, list) ->
("Current Time " + new Date().toString() + " Customer Id - " + key.key() +
" START " +
new Date
(key.window().start()).toString() + " --- END " + new Date(key.window().end()).toString()+ " " + list)));
KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), streamsConfig);
kafkaStreams.cleanUp();
保留期如何在这里工作?
1) 首先我摄取了一些交易 id X 的数据,事件日期范围为
2018 年 4 月 16 日星期一 22:25:40 EDT 开始 --- 2018 年 4 月 16 日星期一 22:25:49 EDT 结束
所有这些都在属于同一会话时进行了汇总。
2) 接下来我在 X 时间提取了单个记录事务 id START Mon Apr 16 22:26:45 EDT 2018。
我在提交间隔后看到了 1 条记录
据我了解,这里的直播时间改为22:26:45。此时,上述摄取的记录应从状态存储过期,为 endtime
3) 接下来,我摄取了与第一组事件处于同一时间范围的记录单记录事务 id X。我看到了第一步中的所有记录以及提交间隔后聚合结果中的新当前记录。
第一组记录不应该从状态存储中过期,因为它们已经过了保留期???
在第三步中,我假设我只会得到一个聚合记录,因为旧的记录应该已被删除。从状态存储中删除记录的保留期何时开始?
【问题讨论】: