【问题标题】:what is the difference between stateful and stateless transformation in Kstreams?Kstreams中的有状态和无状态转换有什么区别?
【发布时间】:2021-07-31 19:32:14
【问题描述】:

我是 Kstreams 的初学者,我浏览了文档,但我似乎无法掌握这两者之间的区别,非常感谢一个简单的示例解释。

【问题讨论】:

    标签: apache-kafka apache-kafka-streams


    【解决方案1】:

    OneCricketer 说的是正确的。我发布这个答案只是为了用现在的例子来解释。简而言之,stateful 操作依赖于流的先前事件,而stateless 操作则不是。所以,以这个计数事件为例。

    StreamsBuilder builder = new StreamsBuilder();
    KStream<String, String> stream = builder.stream(INPUT_TOPIC);
    KTable<String, Count> aggregate = stream
          .peek((key, value) -> log.info("received key: {}, value: {}", key, value))
          .filter((key, value) -> /* filter events with value is ____ */)
          .groupByKey()
          .aggregate(new Initializer<Count>() {
                        @Override
                        public Count apply() {
                            return new Count("", 0);
                        }
                    }, new Aggregator<String, String, Count>() {
                        @Override
                        public Count apply(String k, String v, Count aggKeyCount) {
                            Integer currentCount = aggKeyCount.getCount();
                            return new Count(k, currentCount + 1);
                        }
                    });
                    
    aggregate.toStream()
             .map((k,v) -> new KeyValue<>(k, v.getCount()))
             .peek((key, value) -> log.info("emitting key: {}, value: {}", key, value))
             .to(COUNTS_TOPIC, Produced.with(Serdes.String(), Serdes.Integer()));
    

    groupByKeyaggregatefiltermap 操作正在以某种方式转换事件(peek 不会转换事件)。转换groupByKeyfiltermapstateless,因为它们可能会修改它们正在处理的当前事件(它们不关心以前的事件)。 aggregate 转换正在计算事件的数量,因此它正在总结它们。因此,这取决于以前的事件。并不是说它有一个 InitializerCount("", 0),它基于之前的计数器 aggKeyCountCount(k, currentCount + 1) 上逐一聚合事件。

    另外statelessstateful的概念在Kafka的KStream上是没有的。它适用于所有处理引擎,例如 Hadoop MapReduce、Apache SparkApache Flink、Apache Storm。它也存在于任何处理管道上,例如 Java Stream(例如:mapreduceflatmapfilter)、Akka Streamsproject reactor

    【讨论】:

    • 谢谢你的例子,它有助于更​​好地理解这个概念
    【解决方案2】:

    聚合和连接需要状态 - 通过拓扑进行的初始累加器或分组

    过滤、分支、映射或迭代流不需要状态 - 一条消息进来,零或一条消息出来

    值得指出的是 groupBy 函数被认为是无状态的

    【讨论】:

      猜你喜欢
      • 2018-08-11
      • 2018-02-06
      • 1970-01-01
      • 2016-12-11
      • 2019-08-03
      • 2016-04-03
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多