【发布时间】:2020-04-09 06:09:31
【问题描述】:
在将“主管道代码”和“自定义转换代码”分离到多个文件后,我在 DataFlowRunner 上运行流管道时遇到问题,如下所述:Multiple File Dependencies - 没有元素(发布消息)读入管道。 (新)Dataflow UI 中的作业日志、工人日志、作业错误报告选项卡都不会报告任何错误。 Job ID:2020-04-06_15_23_52-4004061030939218807如果有人想看看...
管道最小代码(BEFORE): pipeline.py
row = p | "read_sub" >> pubsub.ReadFromPubSub(subscription=SUB,with_attributes=True,) \
| "add_timestamps" >> beam.Map(add_timestamps)
add_timestamps 是我的自定义转换
def add_timestamps(e):
payload = e.data.decode()
return {"message":payload}
当add_timestamps 和管道代码在同一个文件 pipeline.py 中时,一切正常。
之后我将文件重组如下:
root_dir/
pipeline.py
setup.py
my_transforms/
__init__py.py
transforms.py
在哪里,setup.py
import setuptools
setuptools.setup(
name='my-custom-transforms-package',
version='1.0',
install_requires=["datetime"],
packages= ['my_transforms'] #setuptools.find_packages(),
)
将所有add_timestamps 转换代码移至transforms.py(在my_transforms 包目录下)
在我的 pipeline.py 中,我现在按如下方式导入和使用转换:
from my_transforms.transforms import add_timestamps
row = p | "read_sub" >> pubsub.ReadFromPubSub(subscription=SUB,with_attributes=True,) \
| "add_timestamps" >> beam.Map(add_timestamps)
在启动管道时,我确实设置了标志:--setup_file=./setup.py。
但是没有一个元素被读入管道(如您所见,数据水印仍然卡住并且添加的元素(近似)没有报告任何内容)
【问题讨论】:
标签: python google-cloud-dataflow apache-beam python-packaging