【问题标题】:Kafka Streams: SessionWindowedSerde Vs TimeWindowedSerde. Ambiguous implicit valuesKafka 流:SessionWindowedSerde 与 TimeWindowedSerde。模棱两可的隐含值
【发布时间】:2020-11-18 02:21:46
【问题描述】:

我在以下代码中不断收到“ambiguous implicit values”消息。我尝试了几件事(从我注释掉的几行可以看出)。有想法该怎么解决这个吗?这是在 Scala 中。

  def createTopology(conf: Config, properties: Properties): Topology = {
//    implicit val sessionSerde = Serde[WindowedSerdes.SessionWindowedSerde[String]]
//    implicit val produced: Produced[Windowed[String], Long] = Produced.`with`[WindowedSerdes.SessionWindowedSerde[String], Long]
    implicit val produced: Produced[Windowed[String], Long] = Produced.`with`[Windowed[String], Long]
    implicit val consumed: Consumed[String, String] = Consumed.`with`[String, String]

    val builder: StreamsBuilder = new StreamsBuilder()
    builder.stream("streams-plaintext-input")
        .groupBy((_, word) => word)
        .windowedBy(SessionWindows.`with`(Duration.ofMillis(60 * 1000)))
        .count()
        .toStream.to("streams-pipe-output")

    builder.build()

  }

编译器错误:

Error:(52, 78) ambiguous implicit values:
 both method timeWindowedSerde in object Serdes of type [T](implicit tSerde: org.apache.kafka.common.serialization.Serde[T])org.apache.kafka.streams.kstream.WindowedSerdes.TimeWindowedSerde[T]
 and method sessionWindowedSerde in object Serdes of type [T](implicit tSerde: org.apache.kafka.common.serialization.Serde[T])org.apache.kafka.streams.kstream.WindowedSerdes.SessionWindowedSerde[T]
 match expected type org.apache.kafka.common.serialization.Serde[org.apache.kafka.streams.kstream.Windowed[String]]
    implicit val produced: Produced[Windowed[String], Long] = Produced.`with`[Windowed[String], Long]

Error:(52, 78) could not find implicit value for parameter keySerde: org.apache.kafka.common.serialization.Serde[org.apache.kafka.streams.kstream.Windowed[String]]
    implicit val produced: Produced[Windowed[String], Long] = Produced.`with`[Windowed[String], Long]

Error:(52, 78) not enough arguments for method with: (implicit keySerde: org.apache.kafka.common.serialization.Serde[org.apache.kafka.streams.kstream.Windowed[String]], implicit valueSerde: org.apache.kafka.common.serialization.Serde[Long])org.apache.kafka.streams.kstream.Produced[org.apache.kafka.streams.kstream.Windowed[String],Long].
Unspecified value parameters keySerde, valueSerde.
    implicit val produced: Produced[Windowed[String], Long] = Produced.`with`[Windowed[String], Long]

【问题讨论】:

  • 你能在问题中添加编译器错误吗?
  • 根据您的要求添加了编译器错误。

标签: scala apache-kafka-streams confluent-platform


【解决方案1】:

我只是通过添加导入来添加一些implicits 并编译:

import org.apache.kafka.common.serialization.Serde
import org.apache.kafka.streams.Topology
import org.apache.kafka.streams.kstream.{SessionWindows, Windowed}
import org.apache.kafka.streams.scala.StreamsBuilder
import org.apache.kafka.streams.scala.kstream.{Consumed, Produced}

import java.time.Duration
import java.util.Properties

import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.Serdes
import org.apache.kafka.streams.scala.Serdes.{Long, String}

def createTopology(conf: Config, properties: Properties): Topology = {
  // here we have two implicits to choose, I pick the sessionWindowedSerde because it was in your code
  // implicit val timeWindowedSerde: Serde[Windowed[String]] = Serdes.timeWindowedSerde[String]
  implicit val sessionSerde: Serde[Windowed[String]] = Serdes.sessionWindowedSerde[String]
  implicit val produced: Produced[Windowed[String], Long] = Produced.`with`[Windowed[String], Long]
  implicit val consumed: Consumed[String, String] = Consumed.`with`[String, String]

  val builder: StreamsBuilder = new StreamsBuilder()
  builder.stream("streams-plaintext-input")
    .groupBy((_, word) => word)
    .windowedBy(SessionWindows.`with`(Duration.ofMillis(60 * 1000)))
    .count()
    .toStream.to("streams-pipe-output")

  builder.build()
}

如果您看到错误:

模糊的隐含值

这意味着在您的范围内定义了多个满足所需类型的implicits。例如对象org.apache.kafka.streams.scala.Serdes 有两个隐式:

implicit def timeWindowedSerde[T](implicit tSerde: Serde[T]): WindowedSerdes.TimeWindowedSerde[T] =
    new WindowedSerdes.TimeWindowedSerde[T](tSerde)

implicit def sessionWindowedSerde[T](implicit tSerde: Serde[T]): WindowedSerdes.SessionWindowedSerde[T] =
    new WindowedSerdes.SessionWindowedSerde[T](tSerde)

TimeWindowedSerde 扩展 Serdes.WrapperSerde<Windowed<T>>:

static public class TimeWindowedSerde<T> extends Serdes.WrapperSerde<Windowed<T>>

SessionWindowedSerde 扩展Serdes.WrapperSerde&lt;Windowed&lt;T&gt;&gt;

static public class SessionWindowedSerde<T> extends Serdes.WrapperSerde<Windowed<T>>

它们都扩展了相同的类型Serdes.WrapperSerde&lt;Windowed&lt;T&gt;&gt;, 并在行中:

implicit val produced: Produced[Windowed[String], Long] = Produced.`with`[Windowed[String], Long]

根据with函数签名:

def `with`[K, V](implicit keySerde: Serde[K], valueSerde: Serde[V]): ProducedJ[K, V] =
    ProducedJ.`with`(keySerde, valueSerde)

我们希望Serde[Windowed[String]] 有一些implicit 值,而编译器无法选择其中一个,因为它们都是Serde[Windowed[String]]

因此,如果您只是尝试将它们都添加到同一范围:

implicit val timeWindowedSerde: Serde[Windowed[String]] = Serdes.timeWindowedSerde[String]
implicit val sessionSerde: Serde[Windowed[String]] = Serdes.sessionWindowedSerde[String]

你会看到

ambiguous implicit values

再次。

结论:在导入大量implicits 时要小心,最佳做法是只导入您需要的implicits

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2018-12-23
    • 1970-01-01
    • 1970-01-01
    • 2023-03-24
    • 1970-01-01
    • 2014-01-28
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多