【问题标题】:Avro format deserialization in Spark structured streamSpark结构化流中的Avro格式反序列化
【发布时间】:2018-07-20 08:33:56
【问题描述】:

我正在使用 Spark Structured Streaming,如上所述 this page.

我从 Kafka 主题收到正确的消息,但值为 Avro 格式。有什么方法可以反序列化 Avro 记录(类似于 KafkaAvroDeserializer 方法)?

【问题讨论】:

标签: apache-spark apache-kafka avro spark-structured-streaming


【解决方案1】:

火花 >= 2.4

您可以使用spark-avro 库中的from_avro 函数。

import org.apache.spark.sql.avro._

val schema: String = ???
df.withColumn("value", from_avro($"value", schema))

火花

  • 定义一个接受Array[Byte](序列化对象)的函数:

    import scala.reflect.runtime.universe.TypeTag
    
    def decode[T : TypeTag](bytes: Array[Byte]): T = ???
    

    这将反序列化 Avro 数据并创建可以存储在 Dataset 中的对象。

  • 根据函数创建udf

    val decodeUdf  = udf(decode _)
    
  • 拨打udfvalue

    val df = spark
      .readStream
      .format("kafka")
      ...
      .load()
    
    df.withColumn("value", decodeUdf($"value"))
    

【讨论】:

  • 你知道推荐我一些反序列化器,女巫对这个问题有好处吗?
  • 这些解决方案在使用 Confluent Schema Registry 时不起作用
  • from_avro 要求架构始终与实际记录架构匹配,并且不能采用进化架构。因此,此解决方案可能仅在使用模式注册表时适用。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2020-03-04
  • 2020-07-01
  • 2021-09-30
  • 2017-08-23
  • 2018-08-11
  • 2015-08-01
  • 2019-11-25
相关资源
最近更新 更多