【问题标题】:How to convert the Java api KStream to Scala api KStream?如何将 Java api KStream 转换为 Scala api KStream?
【发布时间】:2019-09-27 07:46:41
【问题描述】:

我们的库有使用典型 kafka 流的数据处理器(Scala 代码):

import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.kstream.KStream

迁移到kafka-stream-scala 的原因是,每当我使用 groupBy、selectByKey 等函数时,返回类型都很奇怪。

(builder: StreamsBuilder, stream: KStream[String, Event]) =>
 val wgCreatedStream = stream.groupBy((_,v) => 
  v.payload match {
   case x:WorkgroupCreated => x.id //this is a String
  })

以这段代码为例。 StreamsBuilder 结果到KGroupedStream[Nothing,Event]

当我使用这些导入时:

import org.apache.kafka.streams.scala.{ByteArrayWindowStore, StreamsBuilder}
import org.apache.kafka.streams.scala.kstream.{Grouped, KStream, Materialized}
import org.apache.kafka.streams.scala.Serdes.{String,Long}

返回类型终于改成KGroupedStream[String,Event]

我真正希望的是: 在不重构我们的数据处理器的情况下使用kafka-stream-scala

如果是,那太棒了,尤其是。如果有例子!
如果不...这将是一个痛苦的旅程。 :((不过还是谢谢)

【问题讨论】:

    标签: scala apache-kafka apache-kafka-streams


    【解决方案1】:

    好的,和我的同事聊了聊,我立刻找到了答案。 -___-

    来自org.apache.kafka.streams.scala.ImplicitConversions,
    有一个 wrapKStream() 可以将
    streams.kstream.KStream 转换为 streams.scala.kstream.KStream

    反之,直接调用inner方法即可。 KTable 也是如此。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2018-02-23
      • 1970-01-01
      • 2018-08-30
      • 2019-09-26
      • 1970-01-01
      • 2017-06-02
      • 2019-10-26
      相关资源
      最近更新 更多