【问题标题】:How to read json data using scala from kafka topic in apache spark如何在apache spark中使用来自kafka主题的scala读取json数据
【发布时间】:2016-05-27 06:28:20
【问题描述】:

我是新火花,请告诉我如何使用 scala 从 apache spark 中的 kafka 主题读取 json 数据。

谢谢。

【问题讨论】:

  • 你已经尝试过了吗?
  • 我尝试使用以下代码:case class MyClass(id:String, endtime:String, host:String, starttime:String, appservername:String, appname:String, classname:String, method:String , eventdate:String, executiontime:String, threadid:String)
  • implicit val myClassFormat = Json.format[MyClass] // val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap val lines = KafkaUtils.createStream (ssc, zkQuorum, group, topic) .map(m => Json.parse(m._2).as[MyClass])
  • 你有什么问题?为您的问题添加详细信息,以获得更好的答案。
  • @Bhaskar 如果您认为我的回答有效,请将其标记为已接受,否则请告诉我任何问题。

标签: scala apache-spark apache-kafka spark-streaming


【解决方案1】:

我使用Play Framework's library for Json。您可以将其作为standalone module 添加到您的项目中。用法如下:

import play.api.libs.json._
import org.apache.spark.streaming.kafka.KafkaUtils

case class MyClass(field1: String,
                   field2: Int)

implicit val myClassFormat = Json.format[MyClass]

val kafkaParams = Map[String, String](...here are your params...)    
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
  ssc, kafkaParams, Set("myTopicName"))
  .map(m => Json.parse(m._2).as[MyClass])

【讨论】:

    【解决方案2】:

    最简单的方法是利用 Spark 附带的 DataFrame 抽象。

    val sqlContext = new SQLContext(sc)
    val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
                      ssc, kafkaParams, Set("myTopicName"))
    
    stream.foreachRDD(
      rdd => {
         val dataFrame = sqlContext.read.json(rdd.map(_._2)) //converts json to DF
         //do your operations on this DF. You won't even require a model class.
            })
    

    【讨论】:

      猜你喜欢
      • 2021-06-11
      • 2023-03-19
      • 2018-01-09
      • 2021-07-03
      • 2020-09-18
      • 2021-12-21
      • 2019-04-23
      • 2018-10-06
      • 2017-03-28
      相关资源
      最近更新 更多