【问题标题】:Spark structured streaming read from kafka json encoding issue从 kafka json 编码问题读取 Spark 结构化流
【发布时间】:2019-08-01 04:12:41
【问题描述】:

我很难使用 Spark 结构化流在 kafka 主题中读取我的 JSON 数据。

上下文:

我正在构建一个简单的管道,在其中我使用 kafka 从 MongoDb(这个数据库经常从另一个应用程序填充)读取数据,然后我想在 Spark 中获取这些数据。

为此,我正在使用 Spark Structured Streaming,它似乎可以工作。

这是我的代码:

import org.apache.spark.rdd
import org.apache.spark.sql.avro._
import org.apache.spark.sql.{Column, SparkSession}
import org.apache.spark.sql.functions.from_json
import org.apache.spark.sql.types.{ArrayType, DataTypes, StructType}
import org.apache.spark.sql.functions.explode
import org.apache.spark.sql.functions.schema_of_json
object KafkaToParquetLbcAutomation extends App {





  val spark = SparkSession
    .builder
    .appName("Kafka-Parquet-Writer")
    .master("local")
    .getOrCreate()
  spark.sparkContext.setLogLevel("ERROR")
  import spark.implicits._

  val kafkaRawDf = spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers",BROKER IP)
    .option("subscribe", "test")
    .option("startingOffsets", "earliest")
    .load()

  val testJsonDf = kafkaRawDf.selectExpr("CAST(value AS STRING)")






  //affichage des data
  val query = testJsonDf
    .writeStream
    .outputMode("append")
    .format("console")
    .queryName("test")
    .start()
    .awaitTermination()
}

阅读完这些 JSON 数据后,我想进行一些转换。

问题从这里开始,由于我无法解码的奇怪编码,我无法解析 JSON 数据。

因此我无法在我的管道上走得更远。

我应该如何获取我的数据:

{
  "field 1" : "value 1 ", 
}

(与许多其他领域)

我实际上是如何获取数据的:

VoituresXhttps://URL.fr/voitures/87478648654.htm�https://img5.url.fr/ad-image/49b7c279087d0cce09123a66557b71d09c01a6d2.jpg�https://img7.url.fr/ad-image/eab7e65419c17542840204fa529b02e64771adbb.jpg�https://img7.urln.fr/ad-image/701b547690e48f11a6e0a1a9e72811cc76fe803e.jpg

问题可能出在分隔符之类的地方。

你能帮帮我吗

谢谢

【问题讨论】:

标签: json apache-spark apache-kafka apache-kafka-connect spark-structured-streaming


【解决方案1】:

问题解决,

这是 kafka 连接器代码中的错误配置。

我只需将此字段添加到连接器:

"key.converter":"org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable":"false",
"value.converter":"org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable":"false",

与 Spark 无关

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2017-04-04
    • 2019-01-29
    • 2019-09-19
    • 1970-01-01
    • 1970-01-01
    • 2020-01-31
    • 2020-07-25
    • 2022-01-01
    相关资源
    最近更新 更多