【发布时间】:2020-11-08 05:40:19
【问题描述】:
我有一个案例类:
case class clickStream(userid:String, adId :String, timestamp:String)
我希望与 KafkaProducer 一起发送的实例:
val record = new ProducerRecord[String,clickStream](
"clicktream",
"data",
clickStream(Random.shuffle(userIdList).head, Random.shuffle(adList).head, new Date().toString).toString
)
producer.send(record)
在 TOPIC 队列中完全按照预期将记录作为字符串发送:
clickStream(user5,ad2,Sat Jul 18 20:48:53 IST 2020)
但是,问题出在消费者端:
val clickStreamDF = spark.readStream
.format("kafka")
.options(kafkaMap)
.option("subscribe","clicktream")
.load()
clickStreamDF
.select($"value".as("string"))
.as[clickStream] //trying to leverage DataSet APIs conversion
.writeStream
.outputMode(OutputMode.Append())
.format("console")
.option("truncate","false")
.start()
.awaitTermination()
显然使用 .as[clickStream] API 不起作用,因为异常是:
Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve '`userid`' given input columns: [value];
这是 [value] 列包含的内容:
Batch: 2
-------------------------------------------
+----------------------------------------------------+
|value |
+----------------------------------------------------+
|clickStream(user3,ad11,Sat Jul 18 20:59:35 IST 2020)|
+----------------------------------------------------+
我尝试使用 Custom Serializer 作为 value.serializer 和 value.deserializer
但在我的目录结构中遇到 ClassNotFoundException 的不同问题。
我有 3 个问题:
Kafka如何在这里使用Custom Deserializer类来解析对象?
我不完全理解编码器的概念以及在这种情况下如何使用它?
使用 Kafka 发送/接收自定义案例类对象的最佳方法是什么?
【问题讨论】:
标签: apache-spark apache-kafka kafka-consumer-api kafka-producer-api spark-structured-streaming