【发布时间】:2020-06-03 03:53:18
【问题描述】:
我的 spark 版本是 1.6.2,我的 kafka 版本是 0.10.1.0。我想发送一个自定义对象作为 kafka 值类型,并尝试将此自定义对象推送到 kafka 主题中。并使用火花流来读取数据。我正在使用直接方法。以下是我的代码:
import com.xxxxx.kafka.{KafkaJsonDeserializer, KafkaObjectDecoder, pharmacyData}
import kafka.serializer.StringDecoder
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object sparkReadKafka {
val sparkConf = new SparkConf().setAppName("SparkReadKafka")
val sc = new SparkContext(sparkConf)
val ssc = new StreamingContext(sc, Seconds(1))
def main(args: Array[String]): Unit = {
val kafkaParams = Map[String, Object] (
"bootstrap.servers" -> "kafka.kafka-cluster-shared.non-prod-5-az-scus.prod.us.xxxxx.net:9092",
//"key.deserializer" -> classOf[StringDeserializer],
//"value.deserializer" -> classOf[KafkaJsonDeserializer],
"group.id" -> "consumer-group-2",
"auto.offset.reset" -> "earliest",
"auto.commit.interval.ms" -> "1000",
"enable.auto.commit" -> (false: java.lang.Boolean),
"session.timeout.ms" -> "30000"
)
val topic = "hw_insights"
val stream = KafkaUtils.createDirectStream[String, pharmacyData, StringDecoder, KafkaObjectDecoder](ssc, kafkaParams, Set(topic))
}
}
我得到的错误与此类似(出于安全目的,我必须删除某些部分):
错误:(29, 47) 使用替代方法重载了方法值 createDirectStream: (jssc:org.apache.spark.streaming.api.java.JavaStreamingContext,keyClass:Class[String],valueClass:Class[com.xxxxxxx.kafka.pharmacyData],keyDecoderClass:Class[kafka.serializer.StringDecoder],valueDecoderClass: Class[com.xxxxxxx.kafka.KafkaObjectDecoder],kafkaParams: java.util.Map[String,String],topics: java.util.Set[String])org.apache.spark.streaming.api.java.JavaPairInputDStream[String] ,com.xxxxxxx.kafka.pharmacyData] (ssc:org.apache.spark.streaming.StreamingContext,kafkaParams:scala.collection.immutable.Map[String,String],topics:scala.collection.immutable.Set[String])(隐含证据$19:scala.reflect。 ClassTag[String],隐含证据$20:scala.reflect.ClassTag[com.xxxxxxx.kafka.pharmacyData],隐含证据$21:scala.reflect.ClassTag[kafka.serializer.StringDecoder],隐含证据$22:scala.reflect.ClassTag [com.xxxxxxx.kafka.KafkaObjectDecoder])org.apache.spark.streaming.dstream.InputDStream[(String, com.xxxxxxx.kafka.pharmacyData)] 不能应用于 (org.apache.spark.streaming.StreamingContext, scala.collection.immutable.Map[String,Object], scala.collection.immutable.Set[String]) val 流 = KafkaUtils.createDirectStream[String, pharmacyData, StringDecoder, KafkaObjectDecoder](ssc, kafkaParams, Set(topic)) 下面是我的客户解码器类:
import kafka.serializer.Decoder
import org.codehaus.jackson.map.ObjectMapper
class KafkaObjectDecoder extends Decoder[pharmacyData] {
override def fromBytes(bytes: Array[Byte]): pharmacyData = {
val mapper = new ObjectMapper()
val pdata = mapper.readValue(bytes, classOf[pharmacyData])
pdata
}
}
有人可以帮我解决问题吗?谢谢!
【问题讨论】:
-
1) 请显示完整的错误。看起来像一个编译问题 2) Kafka 已经带有 json 反序列化器。 3)但你也应该升级 Spark
-
嗨,我只是添加了完整的错误。你能告诉我如何添加kafka json反序列化器吗?如果有例子就更好了。而且升级火花不在我的控制之下。谢谢!
-
为什么它不在你的控制范围内?您可以更新您的 Maven 依赖项并将您自己的 Spark 分发 tarball 上传到包含较新版本的 HDFS
-
我需要将项目提交给 Spark 集群。 Spark 集群在 spark 1.6.2 上运行。
标签: scala apache-spark apache-kafka spark-streaming spark-streaming-kafka