【问题标题】:Managing Dependencies - pipeline code spans multiple files管理依赖 - 管道代码跨越多个文件
【发布时间】:2020-04-09 06:09:31
【问题描述】:

在将“主管道代码”和“自定义转换代码”分离到多个文件后,我在 DataFlowRunner 上运行流管道时遇到问题,如下所述:Multiple File Dependencies - 没有元素(发布消息)读入管道。 (新)Dataflow UI 中的作业日志、工人日志、作业错误报告选项卡都不会报告任何错误。 Job ID:2020-04-06_15_23_52-4004061030939218807如果有人想看看...

管道最小代码(BEFORE): pipeline.py

row = p | "read_sub" >> pubsub.ReadFromPubSub(subscription=SUB,with_attributes=True,) \
        | "add_timestamps" >> beam.Map(add_timestamps)

add_timestamps 是我的自定义转换

def add_timestamps(e):
    payload = e.data.decode()
    return {"message":payload}

add_timestamps 和管道代码在同一个文件 pipeline.py 中时,一切正常。

之后我将文件重组如下:

root_dir/
   pipeline.py
   setup.py
   my_transforms/
      __init__py.py
      transforms.py

在哪里,setup.py

import setuptools
setuptools.setup(
   name='my-custom-transforms-package',
   version='1.0',
   install_requires=["datetime"],
   packages= ['my_transforms'] #setuptools.find_packages(),
)

将所有add_timestamps 转换代码移至transforms.py(在my_transforms 包目录下)

在我的 pipeline.py 中,我现在按如下方式导入和使用转换:

from my_transforms.transforms import add_timestamps
row = p | "read_sub" >> pubsub.ReadFromPubSub(subscription=SUB,with_attributes=True,) \
        | "add_timestamps" >> beam.Map(add_timestamps)

在启动管道时,我确实设置了标志:--setup_file=./setup.py

但是没有一个元素被读入管道(如您所见,数据水印仍然卡住并且添加的元素(近似)没有报告任何内容)

【问题讨论】:

    标签: python google-cloud-dataflow apache-beam python-packaging


    【解决方案1】:

    我已经在 Dataflow 中测试了多个文件依赖项选项,对我来说效果很好。我从Medium复制了示例。

    您的目录结构正确。您是否在 transforms.py 文件中添加了任何导入?

    我建议您在setup.py 中进行一些更改:

    import setuptools
    
    REQUIRED_PACKAGES = [
        ‘datetime’
    ]
    
    PACKAGE_NAME = 'my_transforms'
    PACKAGE_VERSION = '0.0.1'
    
    setuptools.setup(
       name=PACKAGE_NAME,
       version=PACKAGE_VERSION,
       description='My transforms package',
       install_requires=REQUIRED_PACKAGES,
       packages=setuptools.find_packages()
    )
    

    运行管道时,请注意在PipelineOptions 中设置以下字段:job_nameprojectrunnerstaging_locationtemp_location。您必须至少指定 temp_locationstaging_location 之一才能在 Google 云上运行您的管道。如果您使用 Apache Beam SDK for Python 2.15.0 或更高版本,您还必须指定区域。记住要指定setup.py 的完整路径。

    它看起来类似于那个命令:

    python3 pipeline.py \
    --job_name <JOB_NAME>
    --project <PROJECT_NAME> \
    --runner DataflowRunner \
    --region <REGION> \
    --temp_location gs://<BUCKET_NAME>/temp \
    --setup_file /<FULL_PATH>/setup.py
    

    希望对你有帮助。

    【讨论】:

    • 感谢您的评论。所有这些都设置好了..试图为--setup_file 提供 //setup.py 但这也无济于事。
    • 我找到了根本原因 - 上面已回答。如果至少在 Job/Worker 日志中显示了一些错误消息,那就太好了!花了大约 2-3 天的时间来解决问题:=)
    【解决方案2】:

    我找到了根本原因...我正在设置标志--no_use_public_ips 并在setup.py 中有install_requires=["datetime"]..

    当然,如果没有外部 IP,worker 无法与 python 包管理器服务器通信以安装 datetime。通过不设置标志--no_use_public_ips 来解决问题(稍后我将查看解决方案如何为工作人员禁用外部 IP 并且仍然能够成功运行)。如果至少在 Job/Worker 日志中显示了一些错误消息,那就太好了!花了大约 2-3 天的时间来解决问题:=)

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2012-06-30
      • 2013-09-03
      • 2013-06-22
      • 2015-04-06
      • 2021-11-18
      • 2021-09-09
      • 2011-03-13
      相关资源
      最近更新 更多