【发布时间】:2018-04-18 09:34:19
【问题描述】:
导入 spark.implicits._ val ds1 = 火花 .readStream .format(“卡夫卡”) .option("kafka.bootstrap.servers", "localhost:9092") .option("订阅", "测试") .option("startingOffsets", "latest") 。加载() .as[KafkaMessage] .select($"value".as[Array[Byte]]) .map(msg=>{
val byteArrayInputStream = new ByteArrayInputStream(msg)
val datumReader:DatumReader[GenericRecord] = new SpecificDatumReader[GenericRecord](messageSchema)
val dataFileReader:DataFileStream[GenericRecord] = new DataFileStream[GenericRecord](byteArrayInputStream, datumReader)
while(dataFileReader.hasNext) {
val userData1: GenericRecord = dataFileReader.next()
userData1.asInstanceOf[org.apache.avro.util.Utf8].toString
}
})
错误: 错误:(49, 9) 无法找到存储在数据集中的类型的编码器。通过导入 spark.implicits 支持原始类型(Int、String 等)和产品类型(案例类)。未来版本中将添加对序列化其他类型的支持。 .map(msg=>{
【问题讨论】:
标签: apache-kafka spark-streaming avro