【发布时间】:2020-07-09 16:38:59
【问题描述】:
我正在尝试根据条件将不同的函数应用于 DataFrame 的各个列。当我在循环中执行此操作时,fn1 在第一次迭代中成功应用。但是df 在第二次迭代中变成了None。我想问题是我在循环范围内初始化df 的方式。
df = spark.createDataFrame([(10,4,2,3),(20,1,3,4),(30,7,4,5),(40,2,1,9)], schema=['id','metric_1','metric_2', 'metric_3'])
cols_info = [{'name':'metric_1','apply_func':'True','method':'fn1'},{'name':'metric_2','apply_func':'True','method':'fn2'}, {'name':'metric_3','apply_func':'True','method':'fn3'}]
def fn1(df, col):
return df.withColumn(col, F.pow(df[col], 2))
def fn2(df, col):
return df.withColumn(col, F.hash(df[col]))
def fn3(df, col):
return df.withColumn(col, F.log2(df[col]))
def process_data(df, columns):
for col in columns:
if col["apply_func"] == "True":
if column["method"] == "fn1":
df = fn1(df, col["name"])
if column["method"] == "fn2":
df = fn2(df, col["name"])
if column["method"] == "fn3":
df = fn3(df, col["name"])
return df
使用 Pyspark DataFrame API 应用此类转换的正确方法是什么?
【问题讨论】:
标签: python python-3.x dataframe pyspark