【问题标题】:Constructing a pipline with ValueProivder.RuntimeProvider使用 ValueProvider.Runtime Provider 构建管道
【发布时间】:2017-10-02 23:44:55
【问题描述】:

我有一个使用库版本 1.9.1 作业的 Google Dataflow 作业,该作业采用运行时参数。我们使用了 TextIO.read().from().withoutValidation()。由于我们迁移到 google dataflow 2.0.0 ,因此在 2.0.0 中删除了 withoutValidation。发行说明页面没有讨论这个https://cloud.google.com/dataflow/release-notes/release-notes-java-2

我们尝试将输入作为 ValueProvider.RuntimeProvider 传递。但是在管道构建过程中,我们得到以下错误。如果将其作为 ValueProvider 传递,则管道创建将尝试验证值提供者。如何在 google cloud dataflow 2.0.0 中为 TextIO 输入提供运行时值提供程序?

java.lang.RuntimeException:方法 getInputFile 不应具有返回类型 RuntimeValueProvider,请改用 ValueProvider。 在 org.apache.beam.sdk.options.ProxyInvocationHandler.getDefault(ProxyInvocationHandler.java:505)

【问题讨论】:

  • RuntimeValueProvider 是一个极其底层的内部实现细节类,用户永远无法使用;它具有公共可见性这一事实是 Java 可见性系统的不幸事故。您所说的“管道创建正在尝试验证价值提供者”是什么意思?我认为这不应该发生。您能否包含您的代码和您遇到的错误的完整打印输出?
  • 正如@jkff 所说,请发布您的代码。我假设您使用的是模板化管道?我正在使用带有参数的2.1.0 模板没有任何问题。

标签: google-cloud-dataflow


【解决方案1】:

我将假设您正在使用模板化管道,并且您的管道正在使用运行时参数。这是一个使用 Cloud Dataflow SDK 2.1.0 版的工作示例。它从 GCS 读取文件(在运行时传递给模板),将每一行转换为 TableRow 并写入 BigQuery。这是一个简单的例子,但它适用于2.1.0

程序参数如下:

 --project=<your_project_id>
 --runner=DataflowRunner
 --templateLocation=gs://<your_bucket>/dataflow_pipeline
 --stagingLocation=gs://<your_bucket>/jars
 --tempLocation=gs://<your_bucket>/tmp

程序代码如下:

public class TemplatePipeline {
    public static void main(String[] args) {
        PipelineOptionsFactory.register(TemplateOptions.class);
        TemplateOptions options = PipelineOptionsFactory
                .fromArgs(args)
                .withValidation()
                .as(TemplateOptions.class);
        Pipeline pipeline = Pipeline.create(options);
        pipeline.apply("READ", TextIO.read().from(options.getInputFile()).withCompressionType(TextIO.CompressionType.GZIP))
                .apply("TRANSFORM", ParDo.of(new WikiParDo()))
                .apply("WRITE", BigQueryIO.writeTableRows()
                        .to(String.format("%s:dataset_name.wiki_demo", options.getProject()))
                        .withCreateDisposition(CREATE_IF_NEEDED)
                        .withWriteDisposition(WRITE_TRUNCATE)
                        .withSchema(getTableSchema()));
        pipeline.run();
    }

    private static TableSchema getTableSchema() {
        List<TableFieldSchema> fields = new ArrayList<>();
        fields.add(new TableFieldSchema().setName("year").setType("INTEGER"));
        fields.add(new TableFieldSchema().setName("month").setType("INTEGER"));
        fields.add(new TableFieldSchema().setName("day").setType("INTEGER"));
        fields.add(new TableFieldSchema().setName("wikimedia_project").setType("STRING"));
        fields.add(new TableFieldSchema().setName("language").setType("STRING"));
        fields.add(new TableFieldSchema().setName("title").setType("STRING"));
        fields.add(new TableFieldSchema().setName("views").setType("INTEGER"));
        return new TableSchema().setFields(fields);
    }

    public interface TemplateOptions extends DataflowPipelineOptions {
        @Description("GCS path of the file to read from")
        ValueProvider<String> getInputFile();

        void setInputFile(ValueProvider<String> value);
    }

    private static class WikiParDo extends DoFn<String, TableRow> {
        @ProcessElement
        public void processElement(ProcessContext c) throws Exception {
            String[] split = c.element().split(",");
            TableRow row = new TableRow();
            for (int i = 0; i < split.length; i++) {
                TableFieldSchema col = getTableSchema().getFields().get(i);
                row.set(col.getName(), split[i]);
            }
            c.output(row);
        }
    }
}

【讨论】:

    猜你喜欢
    • 2023-03-17
    • 1970-01-01
    • 2021-04-20
    • 1970-01-01
    • 2019-12-05
    • 1970-01-01
    • 1970-01-01
    • 2020-01-07
    • 2013-10-27
    相关资源
    最近更新 更多