【问题标题】:overloaded method value createDirectStream with alternatives具有替代方法的重载方法值 createDirectStream
【发布时间】:2020-06-03 03:53:18
【问题描述】:

我的 spark 版本是 1.6.2,我的 kafka 版本是 0.10.1.0。我想发送一个自定义对象作为 kafka 值类型,并尝试将此自定义对象推送到 kafka 主题中。并使用火花流来读取数据。我正在使用直接方法。以下是我的代码:

import com.xxxxx.kafka.{KafkaJsonDeserializer, KafkaObjectDecoder, pharmacyData}
import kafka.serializer.StringDecoder
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object sparkReadKafka {
  val sparkConf = new SparkConf().setAppName("SparkReadKafka")
  val sc = new SparkContext(sparkConf)
  val ssc = new StreamingContext(sc, Seconds(1))

  def main(args: Array[String]): Unit = {
    val kafkaParams = Map[String, Object] (
      "bootstrap.servers" -> "kafka.kafka-cluster-shared.non-prod-5-az-scus.prod.us.xxxxx.net:9092",
      //"key.deserializer" -> classOf[StringDeserializer],
      //"value.deserializer" -> classOf[KafkaJsonDeserializer],
      "group.id" -> "consumer-group-2",
      "auto.offset.reset" -> "earliest",
      "auto.commit.interval.ms" -> "1000",
      "enable.auto.commit" -> (false: java.lang.Boolean),
      "session.timeout.ms" -> "30000"
    )

    val topic = "hw_insights"

    val stream = KafkaUtils.createDirectStream[String, pharmacyData, StringDecoder, KafkaObjectDecoder](ssc, kafkaParams, Set(topic))
  }
}

我得到的错误与此类似(出于安全目的,我必须删除某些部分):

错误:(29, 47) 使用替代方法重载了方法值 createDirectStream: (jssc:org.apache.spark.streaming.api.java.JavaStreamingContext,keyClass:Class[String],valueClass:Class[com.xxxxxxx.kafka.pharmacyData],keyDecoderClass:Class[kafka.serializer.StringDecoder],valueDecoderClass: Class[com.xxxxxxx.kafka.KafkaObjectDecoder],kafkaParams: java.util.Map[String,String],topics: java.util.Set[String])org.apache.spark.streaming.api.java.JavaPairInputDStream[String] ,com.xxxxxxx.kafka.pharmacyData] (ssc:org.apache.spark.streaming.StreamingContext,kafkaParams:scala.collection.immutable.Map[String,String],topics:scala.collection.immutable.Set[String])(隐含证据$19:scala.reflect。 ClassTag[String],隐含证据$20:scala.reflect.ClassTag[com.xxxxxxx.kafka.pharmacyData],隐含证据$21:scala.reflect.ClassTag[kafka.serializer.StringDecoder],隐含证据$22:scala.reflect.ClassTag [com.xxxxxxx.kafka.KafkaObjectDecoder])org.apache.spark.streaming.dstream.InputDStream[(String, com.xxxxxxx.kafka.pharmacyData)] 不能应用于 (org.apache.spark.streaming.StreamingContext, scala.collection.immutable.Map[String,Object], scala.collection.immutable.Set[String]) val 流 = KafkaUtils.createDirectStream[String, pharmacyData, StringDecoder, KafkaObjectDecoder](ssc, kafkaParams, Set(topic)) 下面是我的客户解码器类:

import kafka.serializer.Decoder
import org.codehaus.jackson.map.ObjectMapper

class KafkaObjectDecoder extends Decoder[pharmacyData] {
  override def fromBytes(bytes: Array[Byte]): pharmacyData = {
    val mapper = new ObjectMapper()
    val pdata = mapper.readValue(bytes, classOf[pharmacyData])
    pdata
  }
}

有人可以帮我解决问题吗?谢谢!

【问题讨论】:

  • 1) 请显示完整的错误。看起来像一个编译问题 2) Kafka 已经带有 json 反序列化器。 3)但你也应该升级 Spark
  • 嗨,我只是添加了完整的错误。你能告诉我如何添加kafka json反序列化器吗?如果有例子就更好了。而且升级火花不在我的控制之下。谢谢!
  • 为什么它不在你的控制范围内?您可以更新您的 Maven 依赖项并将您自己的 Spark 分发 tarball 上传到包含较新版本的 HDFS
  • 我需要将项目提交给 Spark 集群。 Spark 集群在 spark 1.6.2 上运行。

标签: scala apache-spark apache-kafka spark-streaming spark-streaming-kafka


【解决方案1】:

错误是说你的参数不正确

不能应用于(org.apache.spark.streaming.StreamingContext, scala.collection.immutable.Map[String,Object], scala.collection.immutable.Set[String])

它认为你想要的closest method

(jssc: org.apache.spark.streaming.api.java.JavaStreamingContext,keyClass: Class[String],valueClass: Class[com.xxxxxxx.kafka.pharmacyData],keyDecoderClass: Class[kafka.serializer.StringDecoder],valueDecoderClass: Class[com.xxxxxxx.kafka.KafkaObjectDecoder],kafkaParams: java.util.Map[String,String],topics: java.util.Set[String])

【讨论】:

  • 对不起,我不明白,我做的哪一部分不正确。你可以说得更详细点吗。我真的是火花流和卡夫卡的新手。非常感谢!
  • 这是一个 Scala 问题,不是 Kafka 或 Streaming。您从哪里复制此代码?你是用IDE来写的吗?错误告诉你问题存在于第 29 行
  • 是的,我正在使用 IntelliJ IDEA
  • 并且createDirectStream方法上没有显示错误?
  • 不,错误在第 29 行,这正是我使用 createDirectStream 方法的那一行。因此,我认为错误是因为我没有正确使用此方法。
猜你喜欢
  • 2014-04-19
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2018-02-17
  • 1970-01-01
  • 1970-01-01
  • 2018-05-17
  • 1970-01-01
相关资源
最近更新 更多