【问题标题】:Kafka topics to Spark Streaming DStream, how to get a JsonKafka 主题到 Spark Streaming DStream,如何获取 Json
【发布时间】:2017-09-30 01:33:00
【问题描述】:

我正在尝试使用 Spark Streaming 从 Kafka 主题中获取信息,然后解析我在该主题中获得的 json。 为了在 DStream 中获取主题,我使用 stringReader,然后使用 foreach 从 DStream 中获取每个 RDD:

myRDD.collect().foreach(println)

为了将 myRDD 转换为 json(当我打印 myRDD 时,json 的格式是正确的)并提取我需要的两个字段,我尝试使用 json4s 并尝试了这个:

val jsonEvent = Json.parse(myRDD)
val srcIp = (jsonEvent / "src_ip")
val dstIp =  (jsonEvent / "dst_ip")

我也试过这样使用json4s:

val jsonEvent = parse(myRDD).asInstanceOf[JObject]
val srcIp = jsonEvent / "src_ip"

但它也不能正常工作。

这是输出:

java.lang.NoSuchMethodError: rg.json4s.jackson.JsonMethods$.parse$default$3()Z

这些是我正在使用的版本:

<dependency>
    <groupId>org.json4s</groupId>
    <artifactId>json4s-native_2.10</artifactId>
    <version>3.5.1</version>
</dependency>
<dependency>
    <groupId>org.json4s</groupId>
    <artifactId>json4s-jackson_2.10</artifactId>
    <version>3.5.1</version>
</dependency>

我认为问题在于我不明白如何将 RDD 中的每条记录转换为 json 对象来解析它。有人可以更深入地向我解释一下,以便我了解它是如何工作的吗? 我的代码正确吗?

谢谢。

【问题讨论】:

    标签: json scala apache-kafka spark-streaming dstream


    【解决方案1】:

    Spark 为您提供 API,用于将输入 JSON 读取到数据集/数据帧。

    如果您正在从文件中读取 JSON。可以使用SparkSessionread().json()方法阅读

    SparkSession spark = SparkSession.builder().appName("Test App").master("local[*]")
                    .getOrCreate();
    
    Dataset<Row> inputfileDataset = spark.read().option("multiLine", true).option("mode", "PERMISSIVE")
                .json("C:\path-to-file\inputfile.json"); 
    

    如果您正在阅读 KAFKA Streams。您可以通过将每个 RDD 转换为 ListStrings 来遍历每个 RDD,然后将它们转换为 Dataset/Dataframe

    JavaPairInputDStream<String, String> directKafkaStream = KafkaUtils.createDirectStream(ssc, String.class,
                    String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topics);
    
    JavaDStream<String> inputJsonStream = directKafkaStream.map(rdd -> {
        return rdd._2;
    });
    
    inputJsonStream.foreachRDD(inputRDD -> {
        SparkSession spark = JavaSparkSessionSingleton.getInstance(inputRDD.context().getConf());
        List<String> strings = inputRDD.collect();
        strings.forEach(x -> {
            Dataset<Row> inputDataset = spark.read().option("multiLine",true).option("mode", "PERMISSIVE").json(inputRDD);
            inputDataset.printSchema();
    });
    

    要查询数据集/数据框,您可以在数据集上使用Select() 函数,然后将其转换为您想要的数据类型。

    【讨论】:

      猜你喜欢
      • 2017-09-03
      • 2019-07-12
      • 2018-07-02
      • 1970-01-01
      • 2023-03-25
      • 2019-06-24
      • 2017-08-08
      • 1970-01-01
      • 2020-06-03
      相关资源
      最近更新 更多