【问题标题】:Split and count column values in PySpark dataframe在 PySpark 数据框中拆分和计算列值
【发布时间】:2019-09-03 06:45:49
【问题描述】:

我在 hdfs 位置有一个 csv 文件,并已转换为 dataframe,而我的 dataframe 如下所示...

column1,column2,column3
Node1,  block1, 1,4,5
Node1,  block1, null
Node1,  block2, 3,6,7
Node1,  block2, null
Node1,  block1, null

我想解析这个dataframe,我的输出dataframe 应该在下面。

column1,column2,column3
Node1,  block1, counter0:1,counter1:4,counter2:5
Node1,  block1, null
Node1,  block2, counter0:3,counter1:6,counter2:7
Node1,  block2, null
Node1,  block1, null

我遇到了下面提到的一些错误,所以任何人都可以帮助我解决这个错误,或者可以帮助我正确/修改代码吗?谢谢。

import pyspark
from pyspark.sql.functions import *
from pyspark.sql.session import SparkSession
from pyspark.sql.functions import col
import pyspark.sql.types as T
from pyspark.sql.functions import udf

start_value = 2
schema_name = 2
start_key = 0

df = spark.read.csv("hdfs://path/Ccounters/test.csv",header=True)

def dict(x):
    split_col = x.split(",")
    col_nm = df.schema.names[schema_name]
    convert = map(lambda x :col_nm + str(start_key) +":"+str(x) ,split_col)
    con_str = ','.join(convert)
    return con_str
udf_dict = udf(dict, StringType())

df1 =df.withColumn('distance', udf_dict(df.column3))
df1.show()

getting error below:

 File "/opt/data/data11/yarn/local/usercache/cdap/appcache/application_1555606923440_67815/container_e48_1555606923440_67815_01_000001/pyspark.zip/pyspark/cloudpickle.py", line 160, in dump
pickle.PicklingError: Could not serialize object: Py4JError: An error occurred while calling o58.__getnewargs__. Trace:
py4j.Py4JException: Method __getnewargs__([]) does not exist
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
    at py4j.Gateway.invoke(Gateway.java:272)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:214)
    at java.lang.Thread.run(Thread.java:745)

【问题讨论】:

    标签: pyspark


    【解决方案1】:

    我发现您不能在 UDF 中使用 spark 对象(例如“map”函数),这是有道理的 (https://stackoverflow.com/a/57230637)。执行所需操作的替代方法是在 UDF 中使用 for 循环。


    第一次编辑

    添加了一个可以轻松将此UDF应用于多个列的部分,基于此问题的答案:how to get the name of column with maximum value in pyspark dataframe

    df = spark.createDataFrame([('Node1', 'block1', '1,4,5', None), ('Node1', 'block1', None, '1,2,3'), ('Node1', 'block2', '3,6,7', None), ('Node1', 'block2', None, '4,5,6'), ('Node1', 'block1', None, '7,8,9')], ['column1', 'column2', 'column3', 'column4'])
    
    #     df.show()
    #     +-------+-------+-------+-------+
    #     |column1|column2|column3|column4|
    #     +-------+-------+-------+-------+
    #     |  Node1| block1|  1,4,5|   null|
    #     |  Node1| block1|   null|  1,2,3|
    #     |  Node1| block2|  3,6,7|   null|
    #     |  Node1| block2|   null|  4,5,6|
    #     |  Node1| block1|   null|  7,8,9|
    #     +-------+-------+-------+-------+
    
    def columnfill(x):
    # if x is empty, return x
    if x == None: 
        return x
    else:
        split = x.split(',')
        y = []
        z = 0
        for i in split:
            y.append('counter'+str(z)+':'+str(i))
            z += 1
        return ','.join(y)
    
    udf_columnfill = udf(columnfill, StringType())
    
    ### Apply UDF to a single column:
    # df_result1 = df.withColumn('distance', udf_columnfill(df.column3))
    
    ### Code for applying UDF to multiple columns
    
    # Define columns that should be transformed
    columnnames = ['column3', 'column4']
    # Create a condition that joins multiple string parts, containing column operations
    cond = "df.withColumn" + ".withColumn".join(["('" + str(c) + "_new', udf_columnfill(df." + str(c) + ")).drop('"+ str(c) +"')" for c in (columnnames)])
    
    #     # Print condition to see which transformations are executed
    #     print(cond)
    #     df.withColumn('column3_new', udf_columnfill(df.column3)).drop('column3').withColumn('column4_new', udf_columnfill(df.column4)).drop('column4')   
    
    # Create the new dataframe that evaluates the defined condition
    df_result2 = eval(cond)
    
    #     df_result2.show()
    #     +-------+-------+--------------------------------+--------------------------------+
    #     |column1|column2|column3_new                     |column4_new                     |
    #     +-------+-------+--------------------------------+--------------------------------+
    #     |Node1  |block1 |counter0:1,counter1:4,counter2:5|null                            |
    #     |Node1  |block1 |null                            |counter0:1,counter1:2,counter2:3|
    #     |Node1  |block2 |counter0:3,counter1:6,counter2:7|null                            |
    #     |Node1  |block2 |null                            |counter0:4,counter1:5,counter2:6|
    #     |Node1  |block1 |null                            |counter0:7,counter1:8,counter2:9|
    #     +-------+-------+--------------------------------+--------------------------------+   
    

    第二次编辑

    在插入列名的位置添加了一个额外的 UDF 输入值,作为列值的前缀:

    # Updated UDF
    def columnfill(cinput, cname):
        # if x is empty, return x
        if cinput == None: 
            return cinput
    
        else:
            values = cinput.split(',')
            output = []
            count = 0
            for value in values:
                output.append(str(cname)+str(count)+":"+str(value))
                count += 1
            return ','.join(output)
    
    udf_columnfill = udf(columnfill, StringType())
    
    # Define columns that should be transformed
    columnnames = ['column3', 'column4']
    # Create a condition that joins multiple string parts, containing column operations
    cond2 = "df.withColumn" + ".withColumn".join(["('" + str(c) + "_new', udf_columnfill(df." + str(c) + ", f.lit('" + str(c) + "_new'))).drop('"+ str(c) +"')" for c in (columnnames)])
    
    df_result3 = eval(cond2)
    # +-------+-------+--------------------------------------------+--------------------------------------------+
    # |column1|column2|column3_new                                 |column4_new                                 |
    # +-------+-------+--------------------------------------------+--------------------------------------------+
    # |Node1  |block1 |column3_new0:1,column3_new1:4,column3_new2:5|null                                        |
    # |Node1  |block1 |null                                        |column4_new0:1,column4_new1:2,column4_new2:3|
    # |Node1  |block2 |column3_new0:3,column3_new1:6,column3_new2:7|null                                        |
    # |Node1  |block2 |null                                        |column4_new0:4,column4_new1:5,column4_new2:6|
    # |Node1  |block1 |null                                        |column4_new0:7,column4_new1:8,column4_new2:9|
    # +-------+-------+--------------------------------------------+--------------------------------------------+
    
    print(cond)
    # df.withColumn('column3_new', udf_columnfill(df.column3, f.lit('column3_new'))).drop('column3').withColumn('column4_new', udf_columnfill(df.column4, f.lit('column4_new'))).drop('column4')
    

    【讨论】:

    • 非常感谢。这行得通。然而,这仅适用于一列。假设我们在 df 中有多个列,我们需要对其进行转换,然后我们可以使用 for 循环它将如何创建许多需要再次加入的 dfs?我试图避免加入 dfs 以重新洗牌 df 中的数据,这可能导致最后不正确的 df 所以想知道其他方法吗?
    • 我认为使用 UDF 一次评估多个列或连接多个数据框并不是一个理想的解决方案。我在答案中添加了一些新代码,它将 UDF 应用于必须转换的每一列。为了保持简洁,在应用后它也会删除此列,但如果愿意,可以跳过这部分。
    • 是的,我按照你上面提到的方法做了。非常感谢您的帮助。
    • 很好用。您能否接受答案以表明它为您的问题提供了解决方案?
    • @RajeshMeher 我认为您的问题现在变得过于具体,无法在其他情况下进行一般使用。我的建议是坚持您的第一个问题,并为您添加的每个附加项提出一个新问题(即“添加列名作为列值的前缀”。)这样,共享知识也更适用于其他用户。尽管如此,我在解决方案中添加了一个编辑,该解决方案包含一个额外的 UDF 输入,可以在其中定义列值前缀。希望它能按预期工作。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2023-03-05
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-10-13
    • 2023-03-17
    相关资源
    最近更新 更多