【问题标题】:Read Avro File and Write it into BigQuery table读取 Avro 文件并将其写入 BigQuery 表
【发布时间】:2019-06-28 22:55:07
【问题描述】:

我的目标是从云存储读取 avro 文件数据并使用 Java 将其写入 BigQuery 表。如果有人提供代码片段/想法来读取 avro 格式的数据并使用 Cloud Dataflow 将其写入 BigQuery 表,那就太好了。

【问题讨论】:

    标签: google-bigquery google-cloud-storage google-cloud-dataflow apache-beam


    【解决方案1】:

    我看到了两种可能的方法:

    1. 使用数据流:
        PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();
        Pipeline p = Pipeline.create(options);
    
        // Read an AVRO file.
        // Alternatively, read the schema from a file.
        // https://beam.apache.org/releases/javadoc/2.11.0/index.html?org/apache/beam/sdk/io/AvroIO.html
        Schema avroSchema = new Schema.Parser().parse(
            "{\"type\": \"record\", "
                + "\"name\": \"quote\", "
                + "\"fields\": ["
                + "{\"name\": \"source\", \"type\": \"string\"},"
                + "{\"name\": \"quote\", \"type\": \"string\"}"
                + "]}");
        PCollection<GenericRecord> avroRecords = p.apply(
            AvroIO.readGenericRecords(avroSchema).from("gs://bucket/quotes.avro"));
    
        // Convert Avro GenericRecords to BigQuery TableRows.
        // It's probably better to use Avro-generated classes instead of manually casting types.
        // https://beam.apache.org/documentation/io/built-in/google-bigquery/#writing-to-bigquery
        PCollection<TableRow> bigQueryRows = avroRecords.apply(
            MapElements.into(TypeDescriptor.of(TableRow.class))
                .via(
                    (GenericRecord elem) ->
                        new TableRow()
                            .set("source", ((Utf8) elem.get("source")).toString())
                            .set("quote", ((Utf8) elem.get("quote")).toString())));
    
        // https://cloud.google.com/bigquery/docs/schemas
        TableSchema bigQuerySchema =
            new TableSchema()
                .setFields(
                    ImmutableList.of(
                        new TableFieldSchema()
                            .setName("source")
                            .setType("STRING"),
                        new TableFieldSchema()
                            .setName("quote")
                            .setType("STRING")));
    
        bigQueryRows.apply(BigQueryIO.writeTableRows()
            .to(new TableReference()
                .setProjectId("project_id")
                .setDatasetId("dataset_id")
                .setTableId("avro_source"))
            .withSchema(bigQuerySchema)
            .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
            .withWriteDisposition(WriteDisposition.WRITE_TRUNCATE));
    
        p.run().waitUntilFinish();
    
    1. 直接将数据导入 BigQuery,无需 Dataflow。请参阅此文档:https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-avro

    【讨论】:

      【解决方案2】:

      为此,您可以尝试使用以下 Python 脚本:

      import apache_beam as beam
      import sys
      
      PROJECT='YOUR_PROJECT'
      BUCKET='YOUR_BUCKET'
      
      def run():
         argv = [
            '--project={0}'.format(PROJECT),
            '--staging_location=gs://{0}/staging/'.format(BUCKET),
            '--temp_location=gs://{0}/staging/'.format(BUCKET),
            '--runner=DataflowRunner'
         ]
      
         p = beam.Pipeline(argv=argv)
      
         (p
            | 'ReadAvroFromGCS' >> beam.io.avroio.ReadFromAvro('gs://{0}/file.avro'.format(BUCKET))
            | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:dataset.avrotable'.format(PROJECT))
         )
      
         p.run()
      
      if __name__ == '__main__':
         run()
      

      希望对你有帮助。

      【讨论】:

      • 感谢您的回复。但是,我只知道Java。让我编辑我的问题。
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2018-08-09
      • 1970-01-01
      • 1970-01-01
      • 2018-11-18
      • 2022-10-13
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多