【问题标题】:Dynamically access file in dataflow pipeline动态访问数据流管道中的文件
【发布时间】:2020-10-21 03:20:47
【问题描述】:

我正在使用数据流管道,我希望能够访问其中的文件。

这是我调用方法来获取文件和提取模块的那一行:

def run():
    """ Run pipeline"""
    options: PipelineOptions = PipelineOptions(
        project='production-213911',
        runner='DataflowRunner',
        region='europe-west1',
        streaming=True,
        setup_file='dataflow/setup.py',
        autoscaling_algorith='THROUGHPUT_BASED',
    )
    proto = Container().protobuf()
    test = proto.get_proto_file('data/build')    
    proto.get_obj_from_file(test)

    with beam.Pipeline(options=options) as pipeline:
        ...

这是我想使用我的模块列表的管道步骤:

        status_records = (status | 'Proto to Dict' >> beam.Map(
        lambda x: convert_proto_to_dict(x, proto.protos)))

这是浏览目录以获取文件的代码:

@staticmethod
def get_proto_file(dirname: str = 'python_protocol') -> List[object]:
    """Iterate threw dir w. build .proto
    :param
        dirname = name of the directory to browse
    :return
        List[object] with module
    """
    protos: List[object] = []
    for root, _, mod in os.walk(dirname):
        for name in mod:
            if 'pb2' in name and 'pyc' not in name:
                print(name)
                module = name[:-3]
                if '/' in root:
                    dirname = root.replace('/', '.')
                path = f'{dirname}.{module}'
                imported_module = importlib.import_module(path)
                protos.append(imported_module)
    return protos

但是我的 'proto.protos' 变量总是设置为 None(意思是 test 是 None,我确定问题出在第一步)

我尝试从与我的管道处于同一级别的文件中调用此行并且它有效:

test = proto.get_proto_file('data/build')

所以我猜这是因为我的文件不在数据流中,因为它们在我的项目中.. 知道怎么做吗?

谢谢:)

【问题讨论】:

    标签: python google-cloud-platform google-cloud-dataflow apache-beam


    【解决方案1】:

    这是一个常见的问题。

    • 首先,您必须了解梁的工作原理。当您准备管道时,您在主服务器上,那里有您的所有代码和所有文件。管道已构建(是的,出于效率原因,您的管道是在 Java 中编译/翻译的,Python 太慢了(如果已经部署了新的运行器,则您的代码是用 C++ 编译的,但无论如何,Python 在运行时会消失)并发布使用管道选项将其发送到工作服务器。

    • 然后,您应该了解问题所在:已编译的管道和选项已交付,而不是您的文件!

    如何解决?

    因为管道选项是与您编译的管道一起发送的,所以将您的文件加载到主服务器中(在您的管道启动之前)并将内容存储到管道选项中。

    从你的转换选项中读取它

    【讨论】:

    • 很抱歉,我不明白您所说的“将文件加载到主服务器”是什么意思,我尝试创建一个 tmp 文件夹,但不起作用,尝试加载模块,确实如此也不行..
    • 好的,不清楚。那么,通过这些行,您可以读取您的文件内容,test = proto.get_proto_file('data/build') proto.get_obj_from_file(test) 对吗?如果是这样,就在这些行之后,将文件的内容(原型)存储到 PipelineOption 对象中。然后在您的管道中,从 PipelineOption 获取内容。 (我知道如何在Java中扩展PipelineOptions类型,不是在Python中,但原理是一样的)
    • 哦,困惑来自我,使用我的方法,我只得到模块和类。由于我没有设法做我想做的事,我想我会尝试将我的文件上传到存储桶
    猜你喜欢
    • 2019-05-29
    • 2019-10-21
    • 1970-01-01
    • 2021-03-13
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多