【问题标题】:writing corrupt data from kafka / json datasource in spark structured streaming在 Spark 结构化流中从 kafka / json 数据源写入损坏的数据
【发布时间】:2019-05-25 02:59:00
【问题描述】:

在 spark 批处理作业中,我通常将 JSON 数据源写入文件,并且可以使用 DataFrame 读取器的损坏列功能将损坏的数据写入单独的位置,并使用另一个读取器从同一位置写入有效数据工作。 (数据写成parquet)

但在 Spark Structred Streaming 中,我首先通过 kafka 将流作为字符串读取,然后使用 from_json 获取我的 DataFrame。然后 from_json 使用 JsonToStructs,它在解析器中使用 FailFast 模式,并且不会将未解析的字符串返回到 DataFrame 中的列。 (请参阅参考文献中的注释)那么我如何使用 SSS 将与我的架构和可能无效的 JSON 不匹配的损坏数据写入另一个位置?

最后,在批处理作业中,同一个作业可以写入两个数据帧。但 Spark Structured Streaming 需要对多个接收器进行特殊处理。然后在 Spark 2.3.1(我的当前版本)中,我们应该包含有关如何正确写入损坏和无效流的详细信息...

参考:https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-Expression-JsonToStructs.html

val rawKafkaDataFrame=spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", config.broker)
  .option("kafka.ssl.truststore.location", path.toString)
  .option("kafka.ssl.truststore.password", config.pass)
  .option("kafka.ssl.truststore.type", "JKS")
  .option("kafka.security.protocol", "SSL")
  .option("subscribe", config.topic)
  .option("startingOffsets", "earliest")

  .load()

val jsonDataFrame = rawKafkaDataFrame.select(col("value").cast("string"))

// does not provide a corrupt column or way to work with corrupt
jsonDataFrame.select(from_json(col("value"), schema)).select("jsontostructs(value).*")

【问题讨论】:

    标签: apache-spark apache-spark-sql spark-structured-streaming


    【解决方案1】:

    当你从字符串转换为 json 时,如果它无法使用提供的模式进行解析,它将返回 null。您可以过滤空值并选择字符串。像这样。

    val jsonDF =  jsonDataFrame.withColumn("json", from_json(col("value"), schema))
    val invalidJsonDF = jsonDF.filter(col("json").isNull).select("value")
    

    【讨论】:

    • 我已经走神了。那么这看起来确实是一个正确的方法,我不得不问自己为什么我没有考虑过。但是,我确实询问了有关编写损坏的数据框的问题。然后因为这需要写入损坏和有效的数据帧,所以我们似乎需要使用多个接收器。您能否针对火花流 2.3.1 进行适当处理?
    • @discord 你是什么意思 spark 2.4 让这更合理?
    【解决方案2】:

    我只是想弄清楚结构化流的 _corrupt_record 等效项。这就是我想出的;希望它能让你更接近你正在寻找的东西:

    // add a status column to partition our output by
    // optional: only keep the unparsed json if it was corrupt
    // writes up to 2 subdirs: 'out.par/status=OK' and 'out.par/status=CORRUPT'
    // additional status codes for validation of nested fields could be added in similar fashion
    
    df.withColumn("struct", from_json($"value", schema))
      .withColumn("status", when($"struct".isNull, lit("CORRUPT")).otherwise(lit("OK")))
      .withColumn("value", when($"status" <=> lit("CORRUPT"), $"value"))
      .write
      .partitionBy("status")
      .parquet("out.par")
    

    【讨论】:

    • 嘿,我现在才意识到 2018 年 12 月实际上不是上个月......
    猜你喜欢
    • 2019-08-16
    • 2018-10-06
    • 1970-01-01
    • 2018-08-20
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多