【发布时间】:2019-05-23 13:03:40
【问题描述】:
Spark 擅长在从磁盘初始读取时将 JSON 解析为嵌套的 StructType,但是如果我已经在 Dataset 中有一个包含 JSON 的 String 列,并且我想将它映射到 @ 987654329@ 具有 StructType 列,具有将整个数据集考虑在内的架构推断,同时充分利用并行状态并避免减少操作?
我知道函数 schema_of_json 和 from_json 显然是为了一起使用来完成此任务或类似的,但我很难找到实际工作的代码示例,尤其是在 Java 中。
我将接受任何提供 Java 示例并满足完整模式推断和完整非简化并行操作目标的答案。或者,如果这不可能,最接近的解决方法。
我目前使用的是 Spark 2.4.0。
我研究了以下相关问题:
Implicit schema discovery on a JSON-formatted Spark DataFrame column
这个问题与我的类似,但对于 Scala。没有公认的答案。 OP 在评论中宣布他们找到了一个“hacky”解决方案来让from_schema 工作。除了“hackiness”之外,该解决方案的问题在于它仅从数据帧的第一行推断模式,因此类型可能受到过于严格的限制:
val jsonSchema: String = df.select(schema_of_json(df.select(col("custom")).first.getString(0))).as[String].first
编辑:我尝试了here 所示的解决方案,如下面的 cmets 所述。这是实现:
SparkSession spark = SparkSession
.builder()
.appName("example")
.master("local[*]")
.getOrCreate();
Dataset<Row> df = spark.read().text(conf.getSourcePath());
df.cache();
String schema = df.select(schema_of_json(col("value")))
.as(Encoders.STRING())
.first();
df.withColumn("parsedJson", from_json(col("value"), schema, new HashMap<String, String>()))
.drop("value")
.write()
.mode("append")
.parquet(conf.getDestinationPath());
从这段代码中我得到了一个错误:
AnalysisException: cannot resolve 'schemaofjson(`value`)' due to data type mismatch: The input json should be a string literal and not null; however, got `value`.;;
'Project [schemaofjson(value#0) AS schemaOfjson(value)#20]
+- Relation[value#0] text
这个错误导致我收到以下 Spark 拉取请求: https://github.com/apache/spark/pull/22775
这似乎表明schema_of_json 从未打算应用于整个表以便对整个事物进行模式推断,而是从使用@987654338 直接传入的单个文字 JSON 样本推断模式@。在这种情况下,我不知道 Spark 提供了任何解决方案来从整个表上的 JSON 进行完整模式推断。除非这里有人可以更正我对此拉取请求的阅读或提供替代方法??
【问题讨论】:
-
对不起,有人可以就否决票告诉我吗?谢谢。
-
@user10958683 感谢您的链接。我看过这两个中的第一个,但没有过多关注它,因为它与我在上面提到的那个链接,我首先找到的。它看起来更有希望,因为它似乎正确使用了
schema_of_json。我毫不费力地将其转换为 Java。会试试的。 -
@user10958683 是的,模式推断本质上显然是简化的。我正在考虑 spark.read.json 的耦合性质,因为它允许在幕后更有效地逐步推断模式。不确定如何在集群上实现。显然,司机必须从所有工人那里收集结果......
-
@user10958683 上面的第一个示例导致我现在在问题中记录的错误。第二个不适用,因为它使用带有文字 JSON 字符串而不是列名的
schema_of_json,因此不能应用于整个表。
标签: java apache-spark apache-spark-sql