【发布时间】:2017-03-07 14:03:17
【问题描述】:
这里有两个最小的工作示例脚本,它们都在 pyspark 中调用 UDF。 UDF 依赖于广播字典,它使用该字典将列映射到新列。产生正确输出的完整工作示例如下:
# default_sparkjob.py
from pyspark.sql.types import *
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext, DataFrame
import pyspark.sql.functions as F
def _transform_df(sc, df):
global mapping
mapping = {1:'First', 2:'Second', 3:'Third'}
mapping = sc.broadcast(mapping)
udf_implement_map = F.udf(_implement_map, StringType())
df = df.withColumn('Mapped', udf_implement_map('A'))
return df
def _implement_map(column):
return mapping.value[column]
if __name__ == "__main__":
#_____________________________________________________________________________
sc = SparkContext()
sqlContext = SQLContext(sc)
#_____________________________________________________________________________
import pandas as pd
pd_df = pd.DataFrame.from_dict( {'A':[1,2,3], 'B':['a','b','c']} )
sp_df = sqlContext.createDataFrame(pd_df)
sp_df = _transform_df(sc, sp_df)
sp_df.show()
# OUTPUT:
#+---+---+------+
#| A| B|Mapped|
#+---+---+------+
#| 1| a| First|
#| 2| b|Second|
#| 3| c| Third|
#+---+---+------+
但是,如果在单独的脚本中导入并使用该函数,则表示未定义映射:
# calling_sparkjob.py
if __name__ == "__main__":
#_____________________________________________________________________________
from pyspark.sql.types import *
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext, DataFrame
import pyspark.sql.functions as F
sc = SparkContext(pyFiles=['default_sparkjob.py'])
sqlContext = SQLContext(sc)
#_____________________________________________________________________________
from default_sparkjob import _transform_df
import pandas as pd
pd_df = pd.DataFrame.from_dict( {'A':[1,2,3], 'B':['a','b','c']} )
sp_df = sqlContext.createDataFrame(pd_df)
sp_df = _transform_df(sc, sp_df)
sp_df.show()
# File "default_sparkjob.py", line 17, in _implement_map
# return mapping.value[column]
# NameError: global name 'mapping' is not defined
谁能解释一下为什么会这样?这是当前代码的真实版本中的主要障碍,该代码导入了许多依赖于来自外部文件的许多 udf 的函数。是否存在我不理解的命名空间问题?
非常感谢。
【问题讨论】:
-
试试这个
def _implement_map(column): return globals()["mapping"].value[column] -
嗨,Rakesh,感谢您的回复。该更改产生:
return globals()["mapping"].value[column] KeyError: 'mapping'在单独提交时在两个脚本中。 -
好的,你能不能把
return、print dir(), print locals(), print globals()前面的这三个东西都打印出来看看mapping有没有 -
我不确定我是否理解。我无法从 pyspark udf 中打印。你的意思是在调用 UDF 之前?
-
不,你应该试试
def _implement_map(column): print globals() print dir() print locals() return mapping.value[column]这将显示全局映射
标签: apache-spark pyspark nameerror udf spark-submit