【问题标题】:how to batch insert data into Google BigQuery from a Java service?如何将数据从 Java 服务批量插入 Google BigQuery?
【发布时间】:2021-02-05 01:16:04
【问题描述】:

我已经阅读了一些关于 SO 和 GCP 文档的类似问题 - 但没有得到明确的答案...

有没有办法将我的 Java 服务中的数据直接批量插入 BigQuery,而不使用中间文件、PubSub 或其他 Google 服务?

这里的关键是“批处理”模式:我不想使用流式 API,因为它的成本很高。 我知道还有其他方法可以使用 Dataflow、Google Cloud Storage 等进行批量插入。我对这些不感兴趣,我需要针对我的用例以编程方式进行批量插入。

我希望使用 REST 批处理 API,但现在看来它已被弃用:https://cloud.google.com/bigquery/batch

文档指出的替代方案是:

Create an BatchRequest object from this Google API client instance.

Sample usage:

 client.batch(httpRequestInitializer)
 .queue(...)
 .queue(...)
 .execute();

此 API 是否使用批处理模式,而不是流式模式,是否正确?

谢谢!

【问题讨论】:

    标签: java google-bigquery


    【解决方案1】:

    写入数据的“批处理”版本在 Java 客户端库中称为“加载作业”。 bigquery.writer 方法创建一个对象,该对象可用于将数据字节作为批处理加载作业写入。根据您要序列化的文件类型设置format options

    import com.google.cloud.bigquery.BigQuery;
    import com.google.cloud.bigquery.BigQueryException;
    import com.google.cloud.bigquery.BigQueryOptions;
    import com.google.cloud.bigquery.FormatOptions;
    import com.google.cloud.bigquery.Job;
    import com.google.cloud.bigquery.JobId;
    import com.google.cloud.bigquery.JobStatistics.LoadStatistics;
    import com.google.cloud.bigquery.TableDataWriteChannel;
    import com.google.cloud.bigquery.TableId;
    import com.google.cloud.bigquery.WriteChannelConfiguration;
    import java.io.IOException;
    import java.io.OutputStream;
    import java.nio.channels.Channels;
    import java.nio.file.FileSystems;
    import java.nio.file.Files;
    import java.nio.file.Path;
    import java.util.UUID;
    
    public class LoadLocalFile {
    
      public static void main(String[] args) throws IOException, InterruptedException {
        String datasetName = "MY_DATASET_NAME";
        String tableName = "MY_TABLE_NAME";
        Path csvPath = FileSystems.getDefault().getPath(".", "my-data.csv");
        loadLocalFile(datasetName, tableName, csvPath, FormatOptions.csv());
      }
    
      public static void loadLocalFile(
          String datasetName, String tableName, Path csvPath, FormatOptions formatOptions)
          throws IOException, InterruptedException {
        try {
          // Initialize client that will be used to send requests. This client only needs to be created
          // once, and can be reused for multiple requests.
          BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();
          TableId tableId = TableId.of(datasetName, tableName);
    
          WriteChannelConfiguration writeChannelConfiguration =
              WriteChannelConfiguration.newBuilder(tableId).setFormatOptions(formatOptions).build();
    
          // The location and JobName must be specified; other fields can be auto-detected.
          String jobName = "jobId_" + UUID.randomUUID().toString();
          JobId jobId = JobId.newBuilder().setLocation("us").setJob(jobName).build();
    
          // Imports a local file into a table.
          try (TableDataWriteChannel writer = bigquery.writer(jobId, writeChannelConfiguration);
              OutputStream stream = Channels.newOutputStream(writer)) {
    
            // This example writes CSV data from a local file,
            // but bytes can also be written in batch from memory.
            // In addition to CSV, other formats such as
            // Newline-Delimited JSON (https://jsonlines.org/) are
            // supported.
            Files.copy(csvPath, stream);
    
          }
    
          // Get the Job created by the TableDataWriteChannel and wait for it to complete.
          Job job = bigquery.getJob(jobId);
          Job completedJob = job.waitFor();
          if (completedJob == null) {
            System.out.println("Job not executed since it no longer exists.");
            return;
          } else if (completedJob.getStatus().getError() != null) {
            System.out.println(
                "BigQuery was unable to load local file to the table due to an error: \n"
                    + job.getStatus().getError());
            return;
          }
    
          // Get output status
          LoadStatistics stats = job.getStatistics();
          System.out.printf("Successfully loaded %d rows. \n", stats.getOutputRows());
        } catch (BigQueryException e) {
          System.out.println("Local file not loaded. \n" + e.toString());
        }
      }
    }
    

    资源:

    【讨论】:

    • 谢谢你,蒂姆。你碰巧有一个从记忆中写作的例子吗?你的意思是这样的: StringBuffer sb = new StringBuffer(); sb.append("field1Value, field2Value, field3Value"); // 添加更多行==行 ; stream.write(sb.toString().getBytes()); ??使用 FormatOptions.csv() 吗?谢谢!!
    • @Marina 是的,类似的。 FormatOptions.csv() 将构建正确的东西。 googleapis.dev/java/google-cloud-bigquery/latest/com/google/… 对于更复杂的类型,您可能需要改用 JSON 或 Avro。此系统测试显示从字符串写入 JSON:github.com/googleapis/java-bigquery/blob/…
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-11-23
    • 2021-01-19
    • 1970-01-01
    • 2017-09-05
    • 2011-02-12
    • 1970-01-01
    相关资源
    最近更新 更多