【问题标题】:Kafka Consumer for Spark written in Scala for Kafka API 0.10: custom AVRO deserializerKafka Consumer for Spark 用 Scala 为 Kafka API 0.10 编写:自定义 AVRO 反序列化器
【发布时间】:2017-12-14 05:29:47
【问题描述】:

我正在将我的 Spark Scala App Kafka API 升级到 v. 0.10。我曾经创建自定义方法来反序列化以字节字符串格式出现的消息。

我已经意识到有一种方法可以将 StringDeserializer 或 ByteArrayDeserializer 作为参数传递给键或值。

但是,我找不到有关如何创建自定义 Avro 模式反序列化器的任何信息,因此我的 kafkaStream 可以在我创建 DirectStream 并使用来自 Kafka 的数据时使用它。

有可能吗?

【问题讨论】:

    标签: scala apache-spark apache-kafka


    【解决方案1】:

    这是可能的。您需要覆盖org.apache.kafka.common.serialization 中定义的Deserializer<T> 接口,并且需要通过包含Kafka 参数的ConsumerStrategy[K, V] 类将key.deserializervalue.deserializer 指向您的自定义类。例如:

    import org.apache.kafka.common.serialization.Deserializer
    
    class AvroDeserializer extends Deserializer[Array[Byte]] {
      override def configure(map: util.Map[String, _], b: Boolean): Unit = ???
      override def close(): Unit = ???
      override def deserialize(s: String, bytes: Array[Byte]): Array[Byte] = ???
    }
    

    然后:

    import org.apache.kafka.clients.consumer.ConsumerRecord
    import org.apache.kafka.common.serialization.StringDeserializer
    import org.apache.spark.streaming.kafka010._
    import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
    import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
    import my.location.with.AvroDeserializer
    
    val ssc: StreamingContext = ???
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "localhost:9092,anotherhost:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[AvroDeserializer],
      "group.id" -> "use_a_separate_group_id_for_each_stream",
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )
    
    val topics = Array("sometopic")
    val stream = KafkaUtils.createDirectStream[String, MyTypeWithAvroDeserializer](
      ssc,
      PreferConsistent,
      Subscribe[String, String](topics, kafkaParams)
    )
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2015-08-01
      • 2014-07-08
      • 2019-11-18
      • 1970-01-01
      • 2021-10-20
      • 2018-02-19
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多