【问题标题】:Google dataflow: AvroIO read from file in google storage passed as runtime parameter谷歌数据流:从谷歌存储中的文件读取的 AvroIO 作为运行时参数传递
【发布时间】:2018-04-30 03:13:05
【问题描述】:

我想使用 java SDK 2 在我的数据流中读取 Avro 文件
我已经使用基于上传到存储桶的文件触发的云功能来安排我的数据流。

以下是选项代码:

ValueProvider <String> getInputFile();
void setInputFile(ValueProvider<String> value);

我正在尝试使用以下代码读取此输入文件:

PCollection<user> records = p.apply(
    AvroIO.read(user.class)
    .from(String.valueOf(options.getInputFile())));

运行管道时出现以下错误:

java.lang.IllegalArgumentException: Unable to find any files matching RuntimeValueProvider{propertyName=inputFile, default=gs://test_bucket/user.avro, value=null}

同样的代码在 TextIO 的情况下也能正常工作。
我们如何读取上传的 Avro 文件以触发触发数据流管道的云功能?

【问题讨论】:

    标签: google-cloud-dataflow google-cloud-functions avro apache-beam gcp


    【解决方案1】:

    请尝试...from(options.getInputFile())),不要将其转换为字符串。

    为简单起见,您甚至可以将选项定义为简单字符串:

       String getInputFile();
       void setInputFile(String value);
    

    【讨论】:

    • 不,我不能在这里使用简单的字符串。我想在运行时传递输入文件。当我使用 options.getInputFile() 时出现不兼容类型错误。 from 期望输入为字符串。
    【解决方案2】:

    您只需使用from(options.getInputFile())AvroIO 明确支持从ValueProvider 读取。

    当前代码采用options.getInputFile(),这是一个ValueProvider,在其上调用JavatoString()函数,它提供了一个人类可读的调试字符串"RuntimeValueProvider{propertyName=inputFile, default=gs://test_bucket/user.avro, value=null}"并将其作为文件名传递给AvroIO以读取,当然这个字符串不是一个有效的文件名,这就是代码当前不起作用的原因。

    还请注意,ValueProvider 的全部意义在于它是一个占位符,用于在构造管道时未知并将稍后提供(可能管道将执行多次,提供不同的值) - 因此在管道施工时提取ValueProvider 的值在设计上是不可能的,因为没有价值。但是在运行时(例如在 DoFn 中),您可以通过调用 .get() 来提取值。

    【讨论】:

    • 您好,我尝试使用 options.getInputFile(),但出现以下错误。不兼容的类型:org.apache.beam.sdk.options.ValueProvider 无法转换为 java.lang.String 以下是代码:PCollection&lt;user&gt; records = p.apply( AvroIO.read(user.class) .from(options.getInputFile()));
    • 哦,确实,似乎 from(ValueProvider) 版本仅在 2.2.0 中添加,目前正在发布过程中。您可以使用版本 2.2.0-SNAPSHOT,或者作为一种解决方法,同时您可以使用通常不应该直接使用的内部类:p.apply(Read.from(AvroSource.from(options.getInputFile()))。 withSchema(user.class)))
    • p.apply(Read.from(AvroSource.from(options.getInputFile()).wi‌​thSchema(user.class)‌​)) 这样也会出现相同的不兼容类型错误。这是来自 Avro 源代码:public static AvroSource&lt;GenericRecord&gt; from(String fileNameOrPattern)
    • 抱歉,我的错误 - 查看了错误的 github 选项卡...是的,您将需要使用 2.2.0-SNAPSHOT。
    • 现在可以试试from(options.getInputFile().get())吗?
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-02-24
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多