【问题标题】:KafkaUtils.createDirectStream returns a "error: type arguments" in scalaKafkaUtils.createDirectStream 在 scala 中返回“错误:类型参数”
【发布时间】:2017-03-30 13:06:33
【问题描述】:

我正在尝试使用 KafkaUtils.createDirectStream 从 Spark 使用 Kafka 主题。我不知道这是 Scala 还是 KafkaUtils/Spark 问题。

这是我对 createDirectStream 的调用:

val messages = KafkaUtils.createDirectStream[String, String, KafkaAvroDecoder, KafkaAvroDecoder, EvtAct](
  ssc,
  kafkaParams,
  fromOffsets,
  messageHandler
)

当我编译代码(来自 maven)时,我有这个错误:

 [ERROR] C:\...\reader\Main.scala:60: error: type arguments [String,String,io.confluent.kafka.serializers.KafkaAvroDecoder,io.confluent.kafka.serializers.KafkaAvroDecoder,fr.sihm.reader.EvtAct] conform to the bounds of none of the overloaded alternatives of
[INFO]  value createDirectStream: [K, V, KD <: kafka.serializer.Decoder[K], VD <: kafka.serializer.Decoder[V], R](jssc: org.apache.spark.streaming.api.java.JavaStreamingContext, keyClass: Class[K], valueClass: Class[V], keyDecoderClass: Class[KD], valueDecoderClass: Class[VD], recordClass: Class[R], kafkaParams: java.util.Map[String,String], fromOffsets: java.util.Map[kafka.common.TopicAndPartition,Long], messageHandler: org.apache.spark.api.java.function.Function[kafka.message.MessageAndMetadata[K,V],R])org.apache.spark.streaming.api.java.JavaInputDStream[R] <and> [K, V, KD <: kafka.serializer.Decoder[K], VD <: kafka.serializer.Decoder[V], R](ssc: org.apache.spark.streaming.StreamingContext, kafkaParams: Map[String,String], fromOffsets: Map[kafka.common.TopicAndPartition,Long], messageHandler: kafka.message.MessageAndMetadata[K,V] => R)(implicit evidence$14: scala.reflect.ClassTag[K], implicit evidence$15: scala.reflect.ClassTag[V], implicit evidence$16: scala.reflect.ClassTag[KD], implicit evidence$17: scala.reflect.ClassTag[VD], implicit evidence$18: scala.reflect.ClassTag[R])org.apache.spark.streaming.dstream.InputDStream[R]
[INFO]     val messages = KafkaUtils.createDirectStream[String, String, KafkaAvroDecoder, KafkaAvroDecoder, EvtAct](
[INFO]                               ^
[ERROR] one error found

createDirectStream 有 4 个声明:

  def createDirectStream[
    K: ClassTag,
    V: ClassTag,
    KD <: Decoder[K]: ClassTag,
    VD <: Decoder[V]: ClassTag,
    R: ClassTag] (
      ssc: StreamingContext,
      kafkaParams: Map[String, String],
      fromOffsets: Map[TopicAndPartition, Long],
      messageHandler: MessageAndMetadata[K, V] => R
  ): InputDStream[R] = {
    val cleanedHandler = ssc.sc.clean(messageHandler)
    new DirectKafkaInputDStream[K, V, KD, VD, R](
      ssc, kafkaParams, fromOffsets, cleanedHandler)
  }

  def createDirectStream[
    K: ClassTag,
    V: ClassTag,
    KD <: Decoder[K]: ClassTag,
    VD <: Decoder[V]: ClassTag] (
      ssc: StreamingContext,
      kafkaParams: Map[String, String],
      topics: Set[String]
  ): InputDStream[(K, V)] = {
    val messageHandler = (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message)
    val kc = new KafkaCluster(kafkaParams)
    val fromOffsets = getFromOffsets(kc, kafkaParams, topics)
    new DirectKafkaInputDStream[K, V, KD, VD, (K, V)](
      ssc, kafkaParams, fromOffsets, messageHandler)
  }

  def createDirectStream[K, V, KD <: Decoder[K], VD <: Decoder[V], R](
      jssc: JavaStreamingContext,
      keyClass: Class[K],
      valueClass: Class[V],
      keyDecoderClass: Class[KD],
      valueDecoderClass: Class[VD],
      recordClass: Class[R],
      kafkaParams: JMap[String, String],
      fromOffsets: JMap[TopicAndPartition, JLong],
      messageHandler: JFunction[MessageAndMetadata[K, V], R]
    ): JavaInputDStream[R] = {
    implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)
    implicit val valueCmt: ClassTag[V] = ClassTag(valueClass)
    implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
    implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass)
    implicit val recordCmt: ClassTag[R] = ClassTag(recordClass)
    val cleanedHandler = jssc.sparkContext.clean(messageHandler.call _)
    createDirectStream[K, V, KD, VD, R](
      jssc.ssc,
      Map(kafkaParams.asScala.toSeq: _*),
      Map(fromOffsets.asScala.mapValues(_.longValue()).toSeq: _*),
      cleanedHandler
    )
  }

  def createDirectStream[K, V, KD <: Decoder[K], VD <: Decoder[V]](
      jssc: JavaStreamingContext,
      keyClass: Class[K],
      valueClass: Class[V],
      keyDecoderClass: Class[KD],
      valueDecoderClass: Class[VD],
      kafkaParams: JMap[String, String],
      topics: JSet[String]
    ): JavaPairInputDStream[K, V] = {
    implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)
    implicit val valueCmt: ClassTag[V] = ClassTag(valueClass)
    implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
    implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass)
    createDirectStream[K, V, KD, VD](
      jssc.ssc,
      Map(kafkaParams.asScala.toSeq: _*),
      Set(topics.asScala.toSeq: _*)
    )
  }
}

请注意,当我将两个第一个类型 [String, String, ...] 替换为 [Object, Object, ...] 时效果很好。

有什么想法吗?

谢谢

【问题讨论】:

  • messageHandler 是如何定义的?
  • messageHandler 定义如下:val messageHandler: MessageAndMetadata[String, String] =&gt; EvtAct = mmd =&gt; new EvtAct(mmd.message)
  • KafkaAvroDecoder 来自哪里?
  • KafkaAvroDecoder是一个由工件提供并由Confluent编写的类(全类名=io.confluent.kafka.serializers.KafkaAvroDecoder)。 这是一个 JAVA 类
  • 你的 kafka 是什么版本的?

标签: scala apache-spark apache-kafka


【解决方案1】:

来自kafkamessage声明:

package kafka.message

import kafka.serializer.Decoder
import kafka.utils.Utils

case class MessageAndMetadata[K, V](topic: String, partition: Int,
                                    private val rawMessage: Message, offset: Long,
                                    keyDecoder: Decoder[K], valueDecoder: Decoder[V]) {

  /**
   * Return the decoded message key and payload
   */
  def key(): K = if(rawMessage.key == null) null.asInstanceOf[K] else keyDecoder.fromBytes(Utils.readBytes(rawMessage.key))

  def message(): V = if(rawMessage.isNull) null.asInstanceOf[V] else valueDecoder.fromBytes(Utils.readBytes(rawMessage.payload))
}

和定义 fromBytes 的 valudecoder 所以我建议你尝试第二个参数 Array[Byte]

val messageHandler: MessageAndMetadata[String, Array[Byte]] => EvtAct = mmd => new EvtAct(mmd.message)

val messages = KafkaUtils.createDirectStream[String, Array[Byte], KafkaAvroDecoder, KafkaAvroDecoder, EvtAct](
  ssc,
  kafkaParams,
  fromOffsets,
  messageHandler
)

【讨论】:

  • 感谢您的回答,但我有同样的错误,MessageAndMetadata[String, Array[Byte]]MessageAndMetadata[Object, Array[Byte]]
  • 如果您使用的是 kafkaAvroEncoder 和解码器,那么编码器的输入将是 Object 并且输出 Array[Byte]。
  • 好的,然后我尝试使用 Kafka 主题,所以我只想解码一个 avro 编码的主题。我应该如何创建我的 directStream?如果我把这些类型编译:KafkaUtils.createDirectStream[Object, Object, KafkaAvroDecoder, KafkaAvroDecoder, EvtAct](...)
  • 问题是,在我的 EvtAct 中,我的构造函数接受一个对象参数 (new EvtAct(mmd.message)),我如何从这个对象中提取字段?我应该将其转换为 GenericRecord 吗?
  • 尝试使用 StringDecoder,它将从您的Array[Bytes]返回字符串
猜你喜欢
  • 1970-01-01
  • 2016-08-15
  • 1970-01-01
  • 2012-02-23
  • 1970-01-01
  • 2013-05-05
  • 2021-11-28
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多