【问题标题】:Spacy Breaks serialization in Pardo - Apache BeamSpacy 在 Pardo 中中断序列化 - Apache Beam
【发布时间】:2021-10-20 16:18:54
【问题描述】:

我正在尝试构建一个 Dataflow 管道,它可以正常工作,而且没有多余的内容。在我介绍 spacy 后,它开始失败并出现以下错误:

    return _create_pardo_operation(
  File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1589, in _create_pardo_operation
    dofn_data = pickler.loads(serialized_fn)
  File "/usr/local/lib/python3.8/site-packages/apache_beam/internal/pickler.py", line 289, in loads
    return dill.loads(s)
  File "/usr/local/lib/python3.8/site-packages/dill/_dill.py", line 275, in loads
    return load(file, ignore, **kwds)
  File "/usr/local/lib/python3.8/site-packages/dill/_dill.py", line 270, in load
    return Unpickler(file, ignore=ignore, **kwds).load()
  File "/usr/local/lib/python3.8/site-packages/dill/_dill.py", line 472, in load
    obj = StockUnpickler.load(self)
  File "/usr/local/lib/python3.8/site-packages/dill/_dill.py", line 577, in _load_type
    return _reverse_typemap[name]
KeyError: 'ClassType'

ParDoCode 在这里:


@beam.typehints.with_input_types(PubsubMessage)
@beam.typehints.with_output_types(beam.pvalue.TaggedOutput)
class PayloadOutput(beam.DoFn):


    def process(self, element):
        yield beam.pvalue.TaggedOutput(element.attributes['payload'],element)

splitme = (pipeline
            | "Read from Pub/Sub"
            >> beam.io.ReadFromPubSub(
                subscription=input_subscription,
                with_attributes=True
            )
            | 'Split Payload' >> beam.ParDo(PayloadOutput()).with_outputs('message','rtbf')

使用 spacy 的代码:


def remove_PII(message, language_code, found_product_names):
    """ De-identify text by masking PII such as people's names, email addresses and phone/credit card numbers """

    """ Mask people's names """
    lang = language_code[:2].lower() # Get language
    # Dictionary of spacy models for different languages
    spacy_keys = {'en':'en_core_web_sm', 'fr':'fr_core_news_sm', 'nl':'nl_core_news_sm', \
              'da':'da_core_news_sm', 'pt':'pt_core_news_sm', 'es':'es_core_news_sm'}

    nlp = spacy.load(spacy_keys[lang]) # load spacy model

我试图寻找相关的问题,发现了github错误但不知道如何解决这个问题

https://github.com/uqfoundation/dill/issues/217

【问题讨论】:

标签: python-3.x serialization apache-beam spacy dataflow


【解决方案1】:

你需要在文件A和文件B中导入这个类“import dill”。

在文件B中,需要调用下面这行代码

dill._dill._reverse_typemap['ClassType'] = type

这就是您使用这些代码行的方式。你可以看到这个documentation

File A
import dill

File B

import dill
dill.dill._reverse_typemap['ClassType'] = type

# do deserialization
dill.loads(some_serialized_string)

您可以在此示例代码中看到其他解决方案。这是documentation

import sys
import os

def pickle_dumps_without_main_refs(obj):
    """
    Yeah this is horrible, but it allows you to pickle an object in the main module so that it can be reloaded in another
    module.
    :param obj:
    :return:
    """
    currently_run_file = sys.argv[0]
    module_path = file_path_to_absolute_module(currently_run_file)
    pickle_str = pickle.dumps(obj, protocol=0)
    pickle_str = pickle_str.replace('__main__', module_path)  # Hack!
    return pickle_str


def pickle_dump_without_main_refs(obj, file_obj):
    string = pickle_dumps_without_main_refs(obj)
    file_obj.write(string)


def file_path_to_absolute_module(file_path):
    """
    Given a file path, return an import path.
    :param file_path: A file path.
    :return:
    """
    assert os.path.exists(file_path)
    file_loc, ext = os.path.splitext(file_path)
    assert ext in ('.py', '.pyc')
    directory, module = os.path.split(file_loc)
    module_path = [module]
    while True:
        if os.path.exists(os.path.join(directory, '__init__.py')):
            directory, package = os.path.split(directory)
            module_path.append(package)
        else:
            break
    path = '.'.join(module_path[::-1])
    return path

现在,我可以简单地把 dill_pickle_script_1.py 改成说

import time
from artemis.remote.child_processes import pickle_dump_without_main_refs

def my_func(a, b):
    time.sleep(0.1)
    return a+b

if __name__ == '__main__':
    with open('testfile.pkl', 'wb') as f:
        pickle_dump_without_main_refs(my_func, f)

然后是 dill_pickle_script_2.py。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-07-12
    • 1970-01-01
    相关资源
    最近更新 更多