【发布时间】:2021-07-31 19:32:14
【问题描述】:
我是 Kstreams 的初学者,我浏览了文档,但我似乎无法掌握这两者之间的区别,非常感谢一个简单的示例解释。
【问题讨论】:
标签: apache-kafka apache-kafka-streams
我是 Kstreams 的初学者,我浏览了文档,但我似乎无法掌握这两者之间的区别,非常感谢一个简单的示例解释。
【问题讨论】:
标签: apache-kafka apache-kafka-streams
OneCricketer 说的是正确的。我发布这个答案只是为了用现在的例子来解释。简而言之,stateful 操作依赖于流的先前事件,而stateless 操作则不是。所以,以这个计数事件为例。
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream(INPUT_TOPIC);
KTable<String, Count> aggregate = stream
.peek((key, value) -> log.info("received key: {}, value: {}", key, value))
.filter((key, value) -> /* filter events with value is ____ */)
.groupByKey()
.aggregate(new Initializer<Count>() {
@Override
public Count apply() {
return new Count("", 0);
}
}, new Aggregator<String, String, Count>() {
@Override
public Count apply(String k, String v, Count aggKeyCount) {
Integer currentCount = aggKeyCount.getCount();
return new Count(k, currentCount + 1);
}
});
aggregate.toStream()
.map((k,v) -> new KeyValue<>(k, v.getCount()))
.peek((key, value) -> log.info("emitting key: {}, value: {}", key, value))
.to(COUNTS_TOPIC, Produced.with(Serdes.String(), Serdes.Integer()));
groupByKey、aggregate、filter、map 操作正在以某种方式转换事件(peek 不会转换事件)。转换groupByKey、filter 和map 是stateless,因为它们可能会修改它们正在处理的当前事件(它们不关心以前的事件)。 aggregate 转换正在计算事件的数量,因此它正在总结它们。因此,这取决于以前的事件。并不是说它有一个 Initializer 或 Count("", 0),它基于之前的计数器 aggKeyCount 在 Count(k, currentCount + 1) 上逐一聚合事件。
另外stateless和stateful的概念在Kafka的KStream上是没有的。它适用于所有处理引擎,例如 Hadoop MapReduce、Apache Spark、Apache Flink、Apache Storm。它也存在于任何处理管道上,例如 Java Stream(例如:map、reduce、flatmap、filter)、Akka Streams、project reactor。
【讨论】:
聚合和连接需要状态 - 通过拓扑进行的初始累加器或分组
过滤、分支、映射或迭代流不需要状态 - 一条消息进来,零或一条消息出来
值得指出的是 groupBy 函数被认为是无状态的
【讨论】: