【问题标题】:BigQuery Connector for Apache Spark - Update a Partitioned Table适用于 Apache Spark 的 BigQuery 连接器 - 更新分区表
【发布时间】:2019-02-01 13:22:53
【问题描述】:

我正在 Google DataProc 上的 Scala 中编写一个 Spark 作业,该作业每天执行并处理每个标记有事务时间的记录。记录按年月组合分组,每个组都写入 GCS 中单独的月度镶木地板文件(例如 2018-07-file.parquet2018-08-file.parquet 等)。请注意,这些文件可以追溯到大约 5 年前并形成一个非常大的数据集 (~1TB)。

我想将这些文件写入 BigQuery 并让作业仅更新当前运行中已更改的月度记录。为简单起见,我想删除带有更新记录的任何月份的现有记录,然后从每月 parquet 文件中加载数据。

我正在尝试使用BigQuery Connector for DataProc,但它似乎只是support updating of an entire table,而不是由日期字段过滤的一批记录。

最好的方法是什么?我尝试将完整的 BigQuery 库 JAR 包含到我的项目中,并使用数据操作查询来删除现有的月度记录,如下所示:

def writeDataset(sparkContext: SparkContext, monthYear: String, ds: Dataset[TargetOrder]) = {
    val dtMonthYear = FeedWriter.parquetDateFormat.parse(monthYear)
    val bigquery: BigQuery = BigQueryOptions.getDefaultInstance.getService
    val queryConfig: QueryJobConfiguration =
      QueryJobConfiguration.newBuilder("DELETE FROM `" + getBQTableName(monthYear) + "` " +
        "WHERE header.trans_time BETWEEN PARSE_DATETIME('" + FeedWriter.parquetDateFormat.toPattern + "', '" + monthYear + "') " +
        "AND PARSE_DATETIME('" + FeedWriter.parquetDateFormat.toPattern + "', '" + DateUtils.addMonths(dtMonthYear, 1) + "') ")
    .setUseLegacySql(false)
    .build();

    val jobId: JobId = JobId.of(UUID.randomUUID().toString());
    val queryJob: Job = bigquery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build()).waitFor()
}

但我收到以下错误(我假设不允许在 DataProc 作业中包含完整的 BQ 客户端 JAR,或者它可能与 BQ 连接器不兼容):

java.lang.NoSuchMethodError: com.google.api.services.bigquery.model.JobReference.setLocation(Ljava/lang/String;)Lcom/google/api/services/bigquery/model/JobReference;
  at com.google.cloud.bigquery.JobId.toPb(JobId.java:114)
  at com.google.cloud.bigquery.JobInfo.toPb(JobInfo.java:370)
  at com.google.cloud.bigquery.BigQueryImpl.create(BigQueryImpl.java:198)
  at com.google.cloud.bigquery.BigQueryImpl.create(BigQueryImpl.java:187)
  at ca.mycompany.myproject.output.BigQueryWriter$.writeDataset(BigQueryWriter.scala:39)

【问题讨论】:

    标签: scala apache-spark google-bigquery google-cloud-dataproc


    【解决方案1】:

    我发现在 DataProc 作业中包含完整的客户端 JAR 似乎不起作用(因此他们为 BQ 和其他服务创建单独的连接器扩展)所以相反,我最终让我的 Dataproc 作业向 Pub/Sub 队列,指明每月更新的 parquet 文件。然后,我创建了一个 Cloud Function 来监控发布/订阅队列并生成一个 BigQuery 作业 以仅加载更改的每月文件。

    我能够通过使用表分区(例如MyTable$20180101)并将所有月度记录分组到同一天(目前,BQ 仅支持按 DAY 而不是按月对表进行分区,因此我必须为每条记录创建一个单独的字段,例如,为 2018-01-xx 中的所有记录设置为 2018-01-01)。

    Dataproc 中用于写入 Pub/Sub 队列的 Scala 代码示例:

    import java.text.SimpleDateFormat
    import java.util.{Date, TimeZone, UUID}
    
    import ca.my.company.config.ConfigOptions
    import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport
    import com.google.api.client.json.jackson2.JacksonFactory
    import com.google.api.services.pubsub.Pubsub
    import com.google.api.services.pubsub.model.{PublishRequest, PubsubMessage}
    import com.google.cloud.hadoop.util.RetryHttpInitializer
    import org.apache.spark.streaming.pubsub.SparkGCPCredentials
    
    import scala.collection.mutable
    
    case class MyPubSubMessage (jobId: UUID, processedDate: Date, fileDate: Date,  updatedFilePath: String)
    
    object PubSubWriter {
      private val PUBSUB_APP_NAME: String = "MyPubSubWriter"
      private val messages: mutable.ListBuffer[PubsubMessage] = mutable.ListBuffer()
      private val publishRequest = new PublishRequest()
      private lazy val projectId: String = ConfigOptions().pubsubConfig.projectId
      private lazy val topicId: String = ConfigOptions().pubsubConfig.topicId
    
      private lazy val client = new Pubsub.Builder(
        GoogleNetHttpTransport.newTrustedTransport(),
        JacksonFactory.getDefaultInstance(),
        new RetryHttpInitializer(
          SparkGCPCredentials.builder.build().provider,
          PUBSUB_APP_NAME
        ))
        .setApplicationName(PUBSUB_APP_NAME)
        .build()
    
      def queueMessage(message: TlogPubSubMessage) {
        if (message == null) return
        val targetFileDateFormat = new SimpleDateFormat("yyyyMMdd")
        val isoDateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'")
        isoDateFormat.setTimeZone(TimeZone.getTimeZone("UTC"))
    
        import scala.collection.JavaConversions._
        val pubSubMessage = new PubsubMessage()
          .setAttributes(Map("msgType" -> "t-log-notification", "jobId" -> message.jobId.toString, "processedDate" -> isoDateFormat.format(message.processedDate), "fileDate" -> targetFileDateFormat.format(message.fileDate)))
    
        messages.synchronized {
          messages.append(pubSubMessage.encodeData(message.updatedFilePath.getBytes))
        }
      }
    
      def publishMessages(): Unit = {
        import scala.collection.JavaConversions._
        publishRequest.setMessages(messages)
        client.projects().topics()
          .publish(s"projects/$projectId/topics/$topicId", publishRequest)
          .execute()
    
        println(s"Update notifications: successfully sent ${messages.length} message(s) for topic '${topicId}' to Pub/Sub")
      }
    }
    

    要从队列中使用并生成 BQ 加载作业的 Python 云函数示例:

    def update_bigquery(data, context):
        import base64
        from datetime import datetime
        from dateutil import parser
        from google.cloud import bigquery
        from google.cloud.bigquery.table import TimePartitioning
        from google.api_core.exceptions import GoogleAPICallError
    
        dataset_id = 'mydatasetname'
        table_id_base = 'mytablename'
    
        # The data field looks like this:
        # {'@type': 'type.googleapis.com/google.pubsub.v1.PubsubMessage', 'attributes': {'fileDate': '20171201',
        # 'jobId': '69f6307e-28a1-40fc-bb6d-572c0bea9346', 'msgType': 't-log-notification',
        # 'processedDate': '2018-09-08T02:51:54Z'}, 'data': 'Z3M6Ly9nY3MtbGRsLWRzLWRhdGE...=='}
    
        # Retrieve file path (filter out SUCCESS file in the folder path) and build the partition name
        attributes = data['attributes']
        file_path = base64.b64decode(data['data']).decode('utf-8') + "/part*"
        partition_name = attributes['fileDate']
        table_partition = table_id_base + "$" + partition_name
    
        # Instantiate BQ client
        client = bigquery.Client()
    
        # Get reference to dataset and table
        dataset_ref = client.dataset(dataset_id)
        table_ref = dataset_ref.table(table_partition)
    
        try:
            # This only deletes the table partition and not the entire table
            client.delete_table(table_ref)  # API request
            print('Table {}:{} deleted.'.format(dataset_id, table_partition))
    
        except GoogleAPICallError as e:
            print('Error deleting table ' + table_partition + ": " + str(e))
    
        # Create BigQuery loading job
        job_config = bigquery.LoadJobConfig()
        job_config.source_format = bigquery.SourceFormat.PARQUET
        job_config.time_partitioning = TimePartitioning(field='bigQueryPartition')
    
        try :
            load_job = client.load_table_from_uri(
                file_path,
                dataset_ref.table(table_partition),
                job_config=job_config)  # API request
    
            print('Starting job {}'.format(load_job.job_id))
    
            # This can be commented-out to allow the job to run purely asynchronously
            # though if it fails, I'm not sure how I could be notified
            # For now, I will set this function to the max timeout (9 mins) and see if the BQ load job can consistently complete in time
            load_job.result()  # Waits for table load to complete.
            print('Job finished.')
    
        except GoogleAPICallError as e:
            print("Error running BQ load job: " + str(e))
            raise e
    
        return 'Success'
    

    【讨论】:

      【解决方案2】:

      bigquery4s 怎么样?

      它是 BQ Java 客户端的 Scala 包装器。我遇到了同样的问题,它对我有用。

      【讨论】:

      • 非常感谢您的建议,尽管我最终采用了不同的方式(见下文)
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2019-10-16
      • 2021-03-07
      • 2018-03-14
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2015-10-28
      相关资源
      最近更新 更多