【问题标题】:spark dataset encoder for kafka avro decoder message用于 kafka avro 解码器消息的 spark 数据集编码器
【发布时间】:2018-04-18 09:34:19
【问题描述】:

导入 spark.implicits._ val ds1 = 火花 .readStream .format(“卡夫卡”) .option("kafka.bootstrap.servers", "localhost:9092") .option("订阅", "测试") .option("startingOffsets", "latest") 。加载() .as[KafkaMessage] .select($"value".as[Array[Byte]]) .map(msg=>{

  val byteArrayInputStream = new ByteArrayInputStream(msg)
  val datumReader:DatumReader[GenericRecord] = new SpecificDatumReader[GenericRecord](messageSchema)
  val dataFileReader:DataFileStream[GenericRecord]  = new DataFileStream[GenericRecord](byteArrayInputStream, datumReader)

  while(dataFileReader.hasNext) {
    val userData1: GenericRecord = dataFileReader.next()

    userData1.asInstanceOf[org.apache.avro.util.Utf8].toString
  }
})

错误: 错误:(49, 9) 无法找到存储在数据集中的类型的编码器。通过导入 spark.implicits 支持原始类型(Int、String 等)和产品类型(案例类)。未来版本中将添加对序列化其他类型的支持。 .map(msg=>{

【问题讨论】:

    标签: apache-kafka spark-streaming avro


    【解决方案1】:

    每当您尝试在结构化流式传输中对数据集进行映射/转换时,都需要与适当的编码器相关联。

    Tor 原始数据类型,隐式编码器由 spark 提供:

    import spark.implicits._
    

    其他类型需要手动提供。

    所以在这里你可以使用隐式编码器:

    import scala.reflect.ClassTag
    
    implicit def kryoEncoder[A](implicit ct: ClassTag[A]) = org.apache.spark.sql.Encoders.kryo[A](ct)
    

    ...或者您可以定义与地图函数中正在处理的数据相关联的编码器。

    【讨论】:

      猜你喜欢
      • 2012-01-08
      • 1970-01-01
      • 1970-01-01
      • 2020-02-16
      • 1970-01-01
      • 2016-12-07
      • 2021-01-19
      • 2016-08-28
      • 1970-01-01
      相关资源
      最近更新 更多