【问题标题】:Apply Side input to BigQueryIO.read operation in Apache Beam将 Side input 应用于 Apache Beam 中的 BigQueryIO.read 操作
【发布时间】:2017-07-31 15:35:18
【问题描述】:

有没有办法将侧输入应用到 Apache Beam 中的 BigQueryIO.read() 操作。

例如,我在 PCollection 中有一个值,我想在查询中使用它来从 BigQuery 表中获取数据。这可以使用侧面输入吗?还是应该在这种情况下使用其他东西?

我在类似的情况下使用了 NestedValueProvider,但我想我们只能在某个值取决于我的运行时值时使用它。或者我可以在这里使用同样的东西吗?如果我错了,请纠正我。

我试过的代码:

Bigquery bigQueryClient = start_pipeline.newBigQueryClient(options.as(BigQueryOptions.class)).build();
    Tabledata tableRequest = bigQueryClient.tabledata();

PCollection<TableRow> existingData = readData.apply("Read existing data",ParDo.of(new DoFn<String,TableRow>(){
    @ProcessElement
    public void processElement(ProcessContext c) throws IOException
    {
        List<TableRow> list = c.sideInput(bqDataView);
        String tableName = list.get(0).get("table").toString();
        TableDataList table = tableRequest.list("projectID","DatasetID",tableName).execute();

        for(TableRow row:table.getRows())
        {
            c.output(row);
        }
    }
    }).withSideInputs(bqDataView));

我得到的错误是:

Exception in thread "main" java.lang.IllegalArgumentException: unable to serialize BeamTest.StarterPipeline$1@86b455
    at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:53)
    at org.apache.beam.sdk.util.SerializableUtils.clone(SerializableUtils.java:90)
    at org.apache.beam.sdk.transforms.ParDo$SingleOutput.<init>(ParDo.java:569)
    at org.apache.beam.sdk.transforms.ParDo.of(ParDo.java:434)
    at BeamTest.StarterPipeline.main(StarterPipeline.java:158)
Caused by: java.io.NotSerializableException: com.google.api.services.bigquery.Bigquery$Tabledata
    at java.io.ObjectOutputStream.writeObject0(Unknown Source)
    at java.io.ObjectOutputStream.defaultWriteFields(Unknown Source)
    at java.io.ObjectOutputStream.writeSerialData(Unknown Source)
    at java.io.ObjectOutputStream.writeOrdinaryObject(Unknown Source)
    at java.io.ObjectOutputStream.writeObject0(Unknown Source)
    at java.io.ObjectOutputStream.writeObject(Unknown Source)
    at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:49)
    ... 4 more

【问题讨论】:

    标签: google-cloud-dataflow apache-beam


    【解决方案1】:

    Beam 模型目前还不能很好地支持这种依赖数据的操作。

    一种方法是编写您自己的DoFn,它接收侧面输入并直接连接到 BQ。不幸的是,这不会给您任何并行性,因为DoFn 将完全在同一个线程上运行。

    一旦 Beam 支持 Splittable DoFns,这将是另一回事。


    在当前的世界状态下,您需要使用 BQ client library 添加代码来查询 BQ,就好像您不在 Beam 管道中一样。

    鉴于您问题中的代码,关于如何实现此功能的粗略想法如下:

    class ReadDataDoFn extends DoFn<String,TableRow>(){
        private Tabledata tableRequest;
    
        private Bigquery bigQueryClient;
    
        private Bigquery createBigQueryClientWithinDoFn() {
            // I'm not sure how you'd implement this, but you had the right idea
        }
    
        @Setup
        public void setup() {
            bigQueryClient = createBigQueryClientWithinDoFn(); 
            tableRequest = bigQueryClient.tabledata();
        }
        @ProcessElement
        public void processElement(ProcessContext c) throws IOException
        {
            List<TableRow> list = c.sideInput(bqDataView);
            String tableName = list.get(0).get("table").toString();
            TableDataList table = tableRequest.list("projectID","DatasetID",tableName).execute();
    
            for(TableRow row:table.getRows())
            {
                c.output(row);
            }
        }
    }
    
    PCollection<TableRow> existingData = readData.apply("Read existing data",ParDo.of(new ReadDataDoFn()));
    

    【讨论】:

    • 好的@pablo。您能否提供一个将 DoFn 连接到 BigQuery 的示例。这真的很有帮助。
    • 我的回答是,您需要使用 BQ 客户端库 (cloud.google.com/bigquery/docs/reference/…) 添加可以查询 BQ 的代码,就像您不在 Beam 管道中一样。这有意义吗?
    • 是的...我会试一试。谢谢@pablo
    • 我已经用我尝试过的代码更新了帖子。但我收到一条错误消息,提示“无法序列化”。这种方法好吗?
    • 您的想法是对的,但您需要在工作人员内部而不是在创建管道时连接到 bigquery。我已经更新了我的答案以反映这一点。另外,请注意,这是一种非常尴尬的 Beam 使用形式,但它是目前提供您正在寻找的功能的唯一方法。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-06-14
    • 1970-01-01
    • 2020-04-04
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多