【问题标题】:How to convert a column of StringType (json string) to an array of StructType如何将一列StringType(json字符串)转换为StructType数组
【发布时间】:2019-05-19 19:23:59
【问题描述】:

我意识到我可能需要添加更多细节。想象一下,我在数据框中有 2 列。两者都是字符串,一个是ID,另一个是json字符串。

这可以构造如下:

>>> a1 = [{"a": 1, "b": "[{\"h\": 3, \"i\": 5} ,{\"h\": 4, \"i\": 6}]" },
...       {"a": 1, "b": "[{\"h\": 6, \"i\": 10},{\"h\": 8, \"i\": 12}]"}]
>>> df1 = sqlContext.read.json(sc.parallelize(a1))
>>> df1.show()
+---+--------------------+
|  a|                   b|
+---+--------------------+
|  1|[{"h": 3, "i": 5}...|
|  1|[{"h": 6, "i": 10...|
+---+--------------------+
>>> df1.printSchema()
root
 |-- a: long (nullable = true)
 |-- b: string (nullable = true)

请注意,json 代码是 StringType。我想编写一个函数来创建新列,将数据存储为嵌套表,如下所示:

root
 |-- a: long (nullable = true)
 |-- b: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- h: long (nullable = true)
 |    |    |-- i: long (nullable = true)

我使用的是 1.6,因此我没有 to_json 转换函数。我已经尝试过这样做

>>> df1.withColumn('new', get_json_object(df1.b,'$')).show()
+---+--------------------+--------------------+
|  a|                   b|                 new|
+---+--------------------+--------------------+
|  1|[{"h": 3, "i": 5}...|[{"h":3,"i":5},{"...|
|  1|[{"h": 6, "i": 10...|[{"h":6,"i":10},{...|
+---+--------------------+--------------------+

问题是创建的新列仍然是一个字符串。 :(

【问题讨论】:

  • 更多信息,这是将 XML 字符串字段转换为嵌套表。我已经使用特定列的映射将 XML 解析为 json,并使用了 sqlContext.read.json(rdd),它确实有效。但是,我不想这样做,我想在数据框上使用 withColumn 并使用这些嵌套值创建一个新列。
  • 您想修改列表“a”以便 Spark 可以推断出您需要的架构吗?或者您不想更改您的列表“a”并处理应用于 rdd 或 df 的修改?

标签: json string casting pyspark


【解决方案1】:

我可以使用地图功能解决问题:

a1 = [{"a": 1, "b": "[{\"h\": 3, \"i\": 5} ,{\"h\": 4, \"i\": 6}]"},{"a": 1, "b": "[{\"h\": 6, \"i\": 10},{\"h\": 8, \"i\": 12}]"}]
df1 = sqlContext.read.json(sc.parallelize(a1))
rdd = df1.map(lambda x: x.b)
df2 = sqlContext.read.json(rdd)

>>> df2.printSchema()
root
 |-- h: long (nullable = true)
 |-- i: long (nullable = true)

问题是我丢失了其他列:

+---+---+
|  h|  i|
+---+---+
|  3|  5|
|  4|  6|
|  6| 10|
|  8| 12|
+---+---+

所以我尝试使用 withColumn 数据框函数,创建一个 udf 以将其显式转换为 json。这就是问题所在,withColumn 似乎不能与 json 对象一起使用。

我的替代方法是编写一个函数来组合前 2 列,如下所示:

# This is a 2.7 workaroud, all string read from configuration file for some reason are converted
# to unicode. This issue does not appear to impact v3.6 and above
def convert_dict(mydict):
return {k.encode('ascii', 'ignore'): str(v).encode('ascii','ignore') for k, v in mydict.iteritems()}

rdd = df1.map(lambda x: {'a': x.a, 'b': [convert_dict(y) for y in json.loads(x.b)]})
df2 = sqlContext.read.json(rdd)

>>> df2.printSchema()
root
|-- a: long (nullable = true)
|-- b: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- h: string (nullable = true)
| | |-- i: string (nullable = true)

【讨论】:

    猜你喜欢
    • 2017-06-03
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-07-20
    • 2015-04-05
    • 2011-11-22
    相关资源
    最近更新 更多