【发布时间】:2017-03-30 13:06:33
【问题描述】:
我正在尝试使用 KafkaUtils.createDirectStream 从 Spark 使用 Kafka 主题。我不知道这是 Scala 还是 KafkaUtils/Spark 问题。
这是我对 createDirectStream 的调用:
val messages = KafkaUtils.createDirectStream[String, String, KafkaAvroDecoder, KafkaAvroDecoder, EvtAct](
ssc,
kafkaParams,
fromOffsets,
messageHandler
)
当我编译代码(来自 maven)时,我有这个错误:
[ERROR] C:\...\reader\Main.scala:60: error: type arguments [String,String,io.confluent.kafka.serializers.KafkaAvroDecoder,io.confluent.kafka.serializers.KafkaAvroDecoder,fr.sihm.reader.EvtAct] conform to the bounds of none of the overloaded alternatives of
[INFO] value createDirectStream: [K, V, KD <: kafka.serializer.Decoder[K], VD <: kafka.serializer.Decoder[V], R](jssc: org.apache.spark.streaming.api.java.JavaStreamingContext, keyClass: Class[K], valueClass: Class[V], keyDecoderClass: Class[KD], valueDecoderClass: Class[VD], recordClass: Class[R], kafkaParams: java.util.Map[String,String], fromOffsets: java.util.Map[kafka.common.TopicAndPartition,Long], messageHandler: org.apache.spark.api.java.function.Function[kafka.message.MessageAndMetadata[K,V],R])org.apache.spark.streaming.api.java.JavaInputDStream[R] <and> [K, V, KD <: kafka.serializer.Decoder[K], VD <: kafka.serializer.Decoder[V], R](ssc: org.apache.spark.streaming.StreamingContext, kafkaParams: Map[String,String], fromOffsets: Map[kafka.common.TopicAndPartition,Long], messageHandler: kafka.message.MessageAndMetadata[K,V] => R)(implicit evidence$14: scala.reflect.ClassTag[K], implicit evidence$15: scala.reflect.ClassTag[V], implicit evidence$16: scala.reflect.ClassTag[KD], implicit evidence$17: scala.reflect.ClassTag[VD], implicit evidence$18: scala.reflect.ClassTag[R])org.apache.spark.streaming.dstream.InputDStream[R]
[INFO] val messages = KafkaUtils.createDirectStream[String, String, KafkaAvroDecoder, KafkaAvroDecoder, EvtAct](
[INFO] ^
[ERROR] one error found
createDirectStream 有 4 个声明:
def createDirectStream[
K: ClassTag,
V: ClassTag,
KD <: Decoder[K]: ClassTag,
VD <: Decoder[V]: ClassTag,
R: ClassTag] (
ssc: StreamingContext,
kafkaParams: Map[String, String],
fromOffsets: Map[TopicAndPartition, Long],
messageHandler: MessageAndMetadata[K, V] => R
): InputDStream[R] = {
val cleanedHandler = ssc.sc.clean(messageHandler)
new DirectKafkaInputDStream[K, V, KD, VD, R](
ssc, kafkaParams, fromOffsets, cleanedHandler)
}
def createDirectStream[
K: ClassTag,
V: ClassTag,
KD <: Decoder[K]: ClassTag,
VD <: Decoder[V]: ClassTag] (
ssc: StreamingContext,
kafkaParams: Map[String, String],
topics: Set[String]
): InputDStream[(K, V)] = {
val messageHandler = (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message)
val kc = new KafkaCluster(kafkaParams)
val fromOffsets = getFromOffsets(kc, kafkaParams, topics)
new DirectKafkaInputDStream[K, V, KD, VD, (K, V)](
ssc, kafkaParams, fromOffsets, messageHandler)
}
def createDirectStream[K, V, KD <: Decoder[K], VD <: Decoder[V], R](
jssc: JavaStreamingContext,
keyClass: Class[K],
valueClass: Class[V],
keyDecoderClass: Class[KD],
valueDecoderClass: Class[VD],
recordClass: Class[R],
kafkaParams: JMap[String, String],
fromOffsets: JMap[TopicAndPartition, JLong],
messageHandler: JFunction[MessageAndMetadata[K, V], R]
): JavaInputDStream[R] = {
implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)
implicit val valueCmt: ClassTag[V] = ClassTag(valueClass)
implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass)
implicit val recordCmt: ClassTag[R] = ClassTag(recordClass)
val cleanedHandler = jssc.sparkContext.clean(messageHandler.call _)
createDirectStream[K, V, KD, VD, R](
jssc.ssc,
Map(kafkaParams.asScala.toSeq: _*),
Map(fromOffsets.asScala.mapValues(_.longValue()).toSeq: _*),
cleanedHandler
)
}
def createDirectStream[K, V, KD <: Decoder[K], VD <: Decoder[V]](
jssc: JavaStreamingContext,
keyClass: Class[K],
valueClass: Class[V],
keyDecoderClass: Class[KD],
valueDecoderClass: Class[VD],
kafkaParams: JMap[String, String],
topics: JSet[String]
): JavaPairInputDStream[K, V] = {
implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)
implicit val valueCmt: ClassTag[V] = ClassTag(valueClass)
implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass)
createDirectStream[K, V, KD, VD](
jssc.ssc,
Map(kafkaParams.asScala.toSeq: _*),
Set(topics.asScala.toSeq: _*)
)
}
}
请注意,当我将两个第一个类型 [String, String, ...] 替换为 [Object, Object, ...] 时效果很好。
有什么想法吗?
谢谢
【问题讨论】:
-
messageHandler是如何定义的? -
messageHandler定义如下:val messageHandler: MessageAndMetadata[String, String] => EvtAct = mmd => new EvtAct(mmd.message) -
KafkaAvroDecoder来自哪里? -
KafkaAvroDecoder是一个由工件提供并由Confluent编写的类(全类名=io.confluent.kafka.serializers.KafkaAvroDecoder)。 这是一个 JAVA 类 -
你的 kafka 是什么版本的?
标签: scala apache-spark apache-kafka