【发布时间】:2019-05-23 17:53:09
【问题描述】:
我正在使用数据流模板来运行云数据流
我提供了一些默认值和调用模板。 Dataflow 在数据流管道摘要中正确显示管道选项。但它不采用运行时值。
class Mypipeoptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument(
'--preprocess_indir',
help='GCS path of the data to be preprocessed',
required=False,
default='gs://default/dataset/'
)
parser.add_value_provider_argument(
'--output_dir_train',
help='GCS path of the preprocessed train data',
required=False,
default='gs://default/train/'
)
parser.add_value_provider_argument(
'--output_dir_test',
help='GCS path of the preprocessed test data',
required=False,
default='gs://default/test/'
)
parser.add_value_provider_argument(
'--output_dir_validate',
help='GCS path of the preprocessed validate data',
required=False,
default='gs://default/validate/'
)
然后我正在检查值是否可以访问
p = beam.Pipeline(options=args)
if args.preprocess_indir.is_accessible():
input_dir = args.preprocess_indir
else:
input_dir = getValObj(args.preprocess_indir)
if args.output_dir_train.is_accessible():
output_train = args.output_dir_train
else:
output_train = getValObj(args.output_dir_train)
if args.output_dir_test.is_accessible():
output_test = args.output_dir_test
else:
output_test = getValObj(args.output_dir_test)
if args.output_dir_validate.is_accessible():
output_validate = args.output_dir_validate
else:
output_validate = getValObj(args.output_dir_validate)
现在在调用模板时,我可以看到我想要作为 (Mypipeoptions)pipeine 选项参数传递的值,但它没有在实际运行中使用,而是使用给定的默认选项
【问题讨论】:
-
您将选项传递给哪些转换,哪些选项不使用运行时值?
-
无,如图所示,这些选项用于 text.io 读取和 tfrecord.io 下沉。
-
您能否更新部分问题以包括您如何在管道中定义转换?
标签: google-cloud-dataflow apache-beam