【问题标题】:PySpark; Split a column of lists into multiple columns派斯帕克;将一列列表拆分为多列
【发布时间】:2021-07-07 02:18:24
【问题描述】:

这个问题类似于 Pandas here 中已经提出的问题。我正在使用 Google Cloud DataProc 集群来执行函数,因此无法将它们转换为 pandas

我想转换以下内容:

+----+----------------------------------+-----+---------+------+--------------------+-------------+
| key|                             value|topic|partition|offset|           timestamp|timestampType|
+----+----------------------------------+-----+---------+------+--------------------+-------------+
|null|["sepal_length","sepal_width",...]| iris|        0|   289|2021-04-11 22:32:...|            0|
|null|["5.0","3.5","1.3","0.3","setosa"]| iris|        0|   290|2021-04-11 22:32:...|            0|
|null|["4.5","2.3","1.3","0.3","setosa"]| iris|        0|   291|2021-04-11 22:32:...|            0|
|null|["4.4","3.2","1.3","0.2","setosa"]| iris|        0|   292|2021-04-11 22:32:...|            0|
|null|["5.0","3.5","1.6","0.6","setosa"]| iris|        0|   293|2021-04-11 22:32:...|            0|
|null|["5.1","3.8","1.9","0.4","setosa"]| iris|        0|   294|2021-04-11 22:32:...|            0|
|null|["4.8","3.0","1.4","0.3","setosa"]| iris|        0|   295|2021-04-11 22:32:...|            0|
+----+----------------------------------+-----+---------+------+--------------------+-------------+

变成这样:

+--------------+-------------+--------------+-------------+-------+
| sepal_length | sepal_width | petal_length | petal_width | class |
+--------------+-------------+--------------+-------------+-------+
| 5.0          | 3.5         | 1.3          | 0.3         | setosa| 
| 4.5          | 2.3         | 1.3          | 0.3         | setosa| 
| 4.4          | 3.2         | 1.3          | 0.2         | setosa| 
| 5.0          | 3.5         | 1.6          | 0.6         | setosa| 
| 5.1          | 3.8         | 1.9          | 0.4         | setosa| 
| 4.8          | 3.0         | 1.4          | 0.3         | setosa| 
+--------------+-------------+--------------+-------------+-------+

我该怎么做呢?任何帮助将不胜感激!

【问题讨论】:

标签: python pandas google-cloud-platform pyspark


【解决方案1】:

走了很长的路,因为 py spark 相对较新。很高兴知道是否有更短的方法

  1. 在 pandas 中重新创建您的数据框

    df = pd.DataFrame({"value":['["sepal_length","sepal_width","petal_length","petal_width","class"]','["5.0","3.5","1.3","0.3","setosa"]','["4.5","2.3","1.3","0.3","setosa"]','["4.4","3.2","1.3","0.2","setosa"]']})

  2. 将 pandas 数据帧转换为 sdf

    sdf = spark.createDataFrame(df)

  3. 我去掉了角括号和"

sdf = sdf.withColumn('value', regexp_replace(col('value'), '[\\[\\"\\]]', "")) sdf.show(truncate=False)

  1. 我用,分割数据框

    df_split = sdf.select(f.split(sdf.value,",")).rdd.flatMap( lambda x: x).toDF(schema=["sepal_length","sepal_width","petal_length","petal_width","class"])

5:过滤掉非数字

df_split = df_split.filter(df_split.sepal_length != "sepal_length")
df_split.show()


+------------+-----------+------------+-----------+------+
|sepal_length|sepal_width|petal_length|petal_width| class|
+------------+-----------+------------+-----------+------+
|         5.0|        3.5|         1.3|        0.3|setosa|
|         4.5|        2.3|         1.3|        0.3|setosa|
|         4.4|        3.2|         1.3|        0.2|setosa|
+------------+-----------+------------+-----------+------+

【讨论】:

  • 感谢您的回答!当我在本地计算机上尝试时,这有效。但在 dataproc 集群上尝试时会引发错误。 Queries with streaming sources must be executed with writeStream.start(); kafka。有关如何解决此问题的任何线索?
  • 我真的不知道。正如我所说,对 pyspark 还很陌生,因此在我有限的知识范围内采取了一条更安全的路线
【解决方案2】:

经过大量的搜索,我终于写了一个代码,以“dataproc”的方式解决它。代码如下:

from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import split, explode, col, regexp_replace, udf
from pyspark.sql import functions as f

spark = SparkSession \
        .builder \
        .appName("appName") \
        .getOrCreate()

spark.sparkContext.setLogLevel("WARN")

df = spark \
     .readStream \
     .format("kafka") \
     .option("kafka.bootstrap.servers", "ip:port") \
     .option("subscribe", "topic-name") \
     .load()

data = df.select([c for c in df.columns if c in ["value", "offset"]])

def convertType(val):
    arr = val.decode("utf-8").split(",")
    print(arr[0], arr[1], arr[2], arr[3])
    print("="*50)
    arr[0], arr[1], arr[2], arr[3] = float(arr[0][2:-1]), float(arr[1][2:-1]), float(arr[2][2:-1]), float(arr[3][2:-1])
    arr[4] = arr[4][:-1]
    return arr

def get_sepal_length(arr):
    val = arr[0]
    return val

def get_sepal_width(arr):
    val = arr[1]
    return val

def get_petal_length(arr):
    val = arr[2]
    return val

def get_petal_width(arr):
    val = arr[3]
    return val

def get_classes(arr):
    val = arr[4][2:-1]
    return val    

convertUDF = udf(lambda z: convertType(z)) 
getSL = udf(lambda z: get_sepal_length(z))
getSW = udf(lambda z: get_sepal_width(z))
getPL = udf(lambda z: get_petal_length(z))
getPW = udf(lambda z: get_petal_width(z))
getC = udf(lambda z: get_classes(z))

df_new = data.select(col("offset"), \
    convertUDF(col("value")).alias("value"))

df_new = df_new.withColumn("sepal_length", getSL(col("value")).cast("float"))
df_new = df_new.withColumn("sepal_width", getSW(col("value")).cast("float"))
df_new = df_new.withColumn("petal_length", getPL(col("value")).cast("float"))
df_new = df_new.withColumn("petal_width", getPW(col("value")).cast("float"))
df_new = df_new.withColumn("classes", getC(col("value")))

query = df_new\
        .writeStream \
        .format("console") \
        .start()

query.awaitTermination()

请注意,arr[i][2:-1], ... 是由于df.value 中的数据格式所致。在我的情况下是'"2.56"。 Dataproc 具有高度限制性,冗长的udf 方法是我能找到的最佳方法:)。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2018-07-30
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2013-01-23
    相关资源
    最近更新 更多