【问题标题】:How to select Case Class Object as DataFrame in Kafka-Spark Structured Streaming如何在 Kafka-Spark 结构化流中选择案例类对象作为 DataFrame
【发布时间】:2020-11-08 05:40:19
【问题描述】:

我有一个案例类:

case class clickStream(userid:String, adId :String, timestamp:String)

我希望与 KafkaProducer 一起发送的实例:

val record = new ProducerRecord[String,clickStream](
  "clicktream",
  "data",
  clickStream(Random.shuffle(userIdList).head, Random.shuffle(adList).head, new Date().toString).toString
)
producer.send(record)

在 TOPIC 队列中完全按照预期将记录作为字符串发送:

clickStream(user5,ad2,Sat Jul 18 20:48:53 IST 2020)

但是,问题出在消费者端:

val clickStreamDF = spark.readStream
.format("kafka")
.options(kafkaMap)
.option("subscribe","clicktream")
.load()
 

clickStreamDF 
.select($"value".as("string"))
.as[clickStream]       //trying to leverage DataSet APIs conversion
.writeStream
.outputMode(OutputMode.Append())
.format("console")
.option("truncate","false")
.start()
.awaitTermination()

显然使用 .as[clickStream] API 不起作用,因为异常是:

Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve '`userid`' given input columns: [value];

这是 [value] 列包含的内容:

    Batch: 2
-------------------------------------------
+----------------------------------------------------+
|value                                               |
+----------------------------------------------------+
|clickStream(user3,ad11,Sat Jul 18 20:59:35 IST 2020)|
+----------------------------------------------------+

我尝试使用 Custom Serializer 作为 value.serializervalue.deserializer

但在我的目录结构中遇到 ClassNotFoundException 的不同问题。

我有 3 个问题:

Kafka如何在这里使用Custom Deserializer类来解析对象?

我不完全理解编码器的概念以及在这种情况下如何使用它?

使用 Kafka 发送/接收自定义案例类对象的最佳方法是什么?

【问题讨论】:

    标签: apache-spark apache-kafka kafka-consumer-api kafka-producer-api spark-structured-streaming


    【解决方案1】:

    当您将 clickStream 对象数据作为 string 传递给 kafka 时,spark 将读取相同的字符串,在 spark 中您必须从 clickStream(user3,ad11,Sat Jul 18 20:59:35 IST 2020) 解析和提取所需字段

    检查下面的代码。

    clickStreamDF 
    .select(split(regexp_extract($"value","\\(([^)]+)\\)",1),"\\,").as("value"))
    .select($"value"(0).as("userid"),$"value"(1).as("adId"),$"value"(2).as("timestamp"))
    .as[clickStream] # Extract all fields from the value string & then use .as[clickStream] option. I think this line is not required as data already parsed to required format. 
    .writeStream
    .outputMode(OutputMode.Append())
    .format("console")
    .option("truncate","false")
    .start()
    .awaitTermination()
    

    示例如何解析clickStream 字符串数据。

    scala> df.show(false)
    +---------------------------------------------------+
    |value                                              |
    +---------------------------------------------------+
    |clickStream(user5,ad2,Sat Jul 18 20:48:53 IST 2020)|
    +---------------------------------------------------+
    
    scala> df
    .select(split(regexp_extract($"value","\\(([^)]+)\\)",1),"\\,").as("value"))
    .select($"value"(0).as("userid"),$"value"(1).as("adId"),$"value"(2).as("timestamp"))
    .as[clickStream]
    .show(false)
    
    +------+----+----------------------------+
    |userid|adId|timestamp                   |
    +------+----+----------------------------+
    |user5 |ad2 |Sat Jul 18 20:48:53 IST 2020|
    +------+----+----------------------------+
    

    使用 Kafka 发送/接收自定义案例类对象的最佳方法是什么?

    尝试将您的案例类转换为 jsonavrocsv 然后将消息发送到 kafka 并使用 spark 阅读相同的消息。

    【讨论】:

      猜你喜欢
      • 2021-07-01
      • 1970-01-01
      • 2020-01-31
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-10-29
      相关资源
      最近更新 更多