【发布时间】:2019-05-25 02:59:00
【问题描述】:
在 spark 批处理作业中,我通常将 JSON 数据源写入文件,并且可以使用 DataFrame 读取器的损坏列功能将损坏的数据写入单独的位置,并使用另一个读取器从同一位置写入有效数据工作。 (数据写成parquet)
但在 Spark Structred Streaming 中,我首先通过 kafka 将流作为字符串读取,然后使用 from_json 获取我的 DataFrame。然后 from_json 使用 JsonToStructs,它在解析器中使用 FailFast 模式,并且不会将未解析的字符串返回到 DataFrame 中的列。 (请参阅参考文献中的注释)那么我如何使用 SSS 将与我的架构和可能无效的 JSON 不匹配的损坏数据写入另一个位置?
最后,在批处理作业中,同一个作业可以写入两个数据帧。但 Spark Structured Streaming 需要对多个接收器进行特殊处理。然后在 Spark 2.3.1(我的当前版本)中,我们应该包含有关如何正确写入损坏和无效流的详细信息...
参考:https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-Expression-JsonToStructs.html
val rawKafkaDataFrame=spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", config.broker)
.option("kafka.ssl.truststore.location", path.toString)
.option("kafka.ssl.truststore.password", config.pass)
.option("kafka.ssl.truststore.type", "JKS")
.option("kafka.security.protocol", "SSL")
.option("subscribe", config.topic)
.option("startingOffsets", "earliest")
.load()
val jsonDataFrame = rawKafkaDataFrame.select(col("value").cast("string"))
// does not provide a corrupt column or way to work with corrupt
jsonDataFrame.select(from_json(col("value"), schema)).select("jsontostructs(value).*")
【问题讨论】:
标签: apache-spark apache-spark-sql spark-structured-streaming