【发布时间】:2020-10-21 03:20:47
【问题描述】:
我正在使用数据流管道,我希望能够访问其中的文件。
这是我调用方法来获取文件和提取模块的那一行:
def run():
""" Run pipeline"""
options: PipelineOptions = PipelineOptions(
project='production-213911',
runner='DataflowRunner',
region='europe-west1',
streaming=True,
setup_file='dataflow/setup.py',
autoscaling_algorith='THROUGHPUT_BASED',
)
proto = Container().protobuf()
test = proto.get_proto_file('data/build')
proto.get_obj_from_file(test)
with beam.Pipeline(options=options) as pipeline:
...
这是我想使用我的模块列表的管道步骤:
status_records = (status | 'Proto to Dict' >> beam.Map(
lambda x: convert_proto_to_dict(x, proto.protos)))
这是浏览目录以获取文件的代码:
@staticmethod
def get_proto_file(dirname: str = 'python_protocol') -> List[object]:
"""Iterate threw dir w. build .proto
:param
dirname = name of the directory to browse
:return
List[object] with module
"""
protos: List[object] = []
for root, _, mod in os.walk(dirname):
for name in mod:
if 'pb2' in name and 'pyc' not in name:
print(name)
module = name[:-3]
if '/' in root:
dirname = root.replace('/', '.')
path = f'{dirname}.{module}'
imported_module = importlib.import_module(path)
protos.append(imported_module)
return protos
但是我的 'proto.protos' 变量总是设置为 None(意思是 test 是 None,我确定问题出在第一步)
我尝试从与我的管道处于同一级别的文件中调用此行并且它有效:
test = proto.get_proto_file('data/build')
所以我猜这是因为我的文件不在数据流中,因为它们在我的项目中.. 知道怎么做吗?
谢谢:)
【问题讨论】:
标签: python google-cloud-platform google-cloud-dataflow apache-beam