【发布时间】:2021-03-02 11:02:17
【问题描述】:
尝试在 pyspark 中执行以下 scala 代码:
val maxJsonParts = 3 // whatever that number is...
val jsonElements = (0 until maxJsonParts)
.map(i => get_json_object($"Payment", s"$$[$i]"))
val newDF = dataframe
.withColumn("Payment", explode(array(jsonElements: _*)))
.where(!isnull($"Payment"))
例如,我正在尝试制作一个嵌套列,例如在下面的付款列中:
| id | name | payment |
|---|---|---|
| 1 | James | [ {"@id": 1, "currency":"GBP"},{"@id": 2, "currency": "USD"} ] |
变成:
| id | name | payment |
|---|---|---|
| 1 | James | {"@id": 1, "currency":"GBP"} |
| 1 | James | {"@id":2, "currency":"USD"} |
表架构:
root
|-- id: integer (nullable = true)
|-- Name: string (nullable = true)
|-- Payment: string (nullable = true)
我尝试在 Pyspark 中编写此代码,但它只是将嵌套列(付款)变为 null:
lst = [range(0,10)]
jsonElem = [F.get_json_object(F.col("payment"), f"$[{i}]") for i in lst]
bronzeDF = bronzeDF.withColumn("payment2", F.explode(F.array(*jsonElem)))
bronzeDF.show()
非常感谢任何帮助。
【问题讨论】:
-
pyspark 的界面和scala 的很相似。如果您有任何具体问题,请尝试自己重写并来这里。 “为我写这个,但用 Python”不是 SO 的问题。
标签: python python-3.x scala apache-spark pyspark