【发布时间】:2018-07-22 01:48:12
【问题描述】:
我有一个如下用例。对于每个传入事件,我想查看 某个字段以查看其状态是否从 A 更改为 B,如果是,则将其发送到 输出主题。流程是这样的:带有键“xyz”的事件以状态 A 进入,一段时间后 另一个事件带有状态 B 的键“xyz”。我有这段代码使用高级 DSL。
final KStream<String, DomainEvent> inputStream....
final KStream<String, DomainEvent> outputStream = inputStream
.map((k, v) -> new KeyValue<>(v.getId(), v))
.groupByKey(Serialized.with(Serdes.String(), jsonSerde))
.aggregate(DomainStatusMonitor::new,
(k, v, aggregate) -> {
aggregate.updateStatusMonitor(v);
return aggregate;
}, Materialized.with(Serdes.String(), jsonSerde))
.toStream()
.filter((k, v) -> v.isStatusChangedFromAtoB())
.map((k,v) -> new KeyValue<>(k, v.getDomainEvent()));
有没有更好的方法来使用 DSL 编写这个逻辑?
关于上面代码中聚合创建的状态存储的几个问题。
- 是否默认创建内存状态存储?
- 如果我有无限数量的唯一传入键会发生什么? 如果它默认使用内存存储,我不需要切换到持久存储吗? 我们如何处理 DSL 中的这种情况?
- 如果状态存储非常大(内存中或持久),它会如何影响 启动时间?如何使流处理等待以使存储完全初始化? 或者 Kafka Streams 是否会确保在处理任何传入事件之前完全初始化状态存储?
提前致谢!
【问题讨论】: