【问题标题】:Usage of custom Python object in Pyspark UDFPyspark UDF 中自定义 Python 对象的使用
【发布时间】:2017-10-11 15:38:45
【问题描述】:

运行以下 PySpark 代码时:

nlp = NLPFunctions()

def parse_ingredients(ingredient_lines):
    parsed_ingredients = nlp.getingredients_bulk(ingredient_lines)[0]
    return list(chain.from_iterable(parsed_ingredients))


udf_parse_ingredients = UserDefinedFunction(parse_ingredients, ArrayType(StringType()))

我收到以下错误: _pickle.PicklingError: Could not serialize object: TypeError: can't pickle _thread.lock objects

我想这是因为 PySpark 无法序列化这个自定义类。但是如何避免在每次运行 parse_ingredients_line 函数时实例化这个昂贵的对象的开销?

【问题讨论】:

    标签: python apache-spark pyspark apache-spark-sql


    【解决方案1】:

    假设您想使用这样定义的Identity 类 (identity.py):

    class Identity(object):                   
        def __getstate__(self):
            raise NotImplementedError("Not serializable")
    
        def identity(self, x):
            return x
    

    例如,您可以使用可调用对象 (f.py) 并将 Identity 实例存储为类成员:

    from identity import Identity
    
    class F(object):                          
        identity = None
    
        def __call__(self, x):
            if not F.identity:
                F.identity = Identity()
            return F.identity.identity(x)
    

    并如下所示使用这些:

    from pyspark.sql.functions import udf
    import f
    
    sc.addPyFile("identity.py")
    sc.addPyFile("f.py")
    
    f_ = udf(f.F())
    
    spark.range(3).select(f_("id")).show()
    
    +-----+
    |F(id)|
    +-----+
    |    0|
    |    1|
    |    2|
    +-----+
    

    或独立函数和闭包:

    from pyspark.sql.functions import udf
    import identity
    
    sc.addPyFile("identity.py")
    
    def f(): 
        dict_ = {}                 
        @udf()              
        def f_(x):                 
            if "identity" not in dict_:
                dict_["identity"] = identity.Identity()
            return dict_["identity"].identity(x)
        return f_
    
    
    spark.range(3).select(f()("id")).show()
    
    +------+
    |f_(id)|
    +------+
    |     0|
    |     1|
    |     2|
    +------+
    

    【讨论】:

    • 我不太明白这个例子。您在哪里表明您能够在 udf 的执行之间保持状态?
    • @Vitaliy 这是标准的 Python 代码——在这两种情况下,我们都将感兴趣的对象保留在外部范围内,因此它的生命周期不限于范围本身。如果您愿意,可以使用nonlocal 代替可变的dict。显然它不能比你无法控制的父解释器活得更久。否则,您可以轻松添加日志记录并使用调试器查看初始化仅在第一次调用时应用。
    • 这很好用!!超快 - 这就是我们使用 spark 的原因 :)
    • user6910411 - 你确定你的代码没有创建 3 个 Identity 类的实例吗?我检查了你的“独立函数和闭包”示例代码,这就是发生在我身上的事情。
    • @PawełBatko 此代码将创建与 Spark 生成的执行器解释器一样多的 Identity 实例(请记住,这里没有共享内存,每个执行器“线程”实际上是 PySpark 中的一个进程). 所以实际数量将取决于被重用的执行程序的数量 - 上限是任务总数(包括重新启动的任务)。有更复杂的策略,但这些策略超出了这个特定的范围回答。
    【解决方案2】:

    我基于 (https://github.com/scikit-learn/scikit-learn/issues/6975) 通过使 NLPFunctions 类的所有依赖项可序列化来解决它。

    【讨论】:

      【解决方案3】:

      编辑:这个答案是错误的。对象在广播的时候还是先序列化再反序列化,所以不能避免序列化。 (Tips for properly using large broadcast variables?)


      尝试使用broadcast variable

      sc = SparkContext()
      nlp_broadcast = sc.broadcast(nlp) # Stores nlp in de-serialized format.
      
      def parse_ingredients(ingredient_lines):
          parsed_ingredients = nlp_broadcast.value.getingredients_bulk(ingredient_lines)[0]
          return list(chain.from_iterable(parsed_ingredients))
      

      【讨论】:

        猜你喜欢
        • 2019-09-05
        • 2022-12-29
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2022-11-24
        相关资源
        最近更新 更多