【问题标题】:Google Dataflow streaming inserts to BigQuery hitting rate limitsGoogle Dataflow 流式插入 BigQuery 达到速率限制
【发布时间】:2020-06-09 12:31:37
【问题描述】:

我正在尝试使用 Dataflow 流处理将记录插入到使用 Python 的 BigQuery 中。从 PubSub 读取存储桶中更改的文件,然后读取、转换文件并将其插入 BigQuery。

但是,当管道开始处理大约 100 到 200 个元素/秒时,我会收到如下错误,表明我超出了速率限制并链接到 this page。有时错误会提到tabledata.list 配额,即 500/秒。

我完全不明白为什么我会看到有关这些配额的消息,因为 BigQuery 的流式插入配额是 1,000,000/秒。

> [while running 'generatedPtransform-52321']

        java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
        java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
        org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:57)
        org.apache.beam.runners.dataflow.worker.fn.control.RegisterAndProcessBundleOperation.finish(RegisterAndProcessBundleOperation.java:332)
        org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:85)
        org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor.execute(BeamFnMapTaskExecutor.java:125)
        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1350)
        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1100(StreamingDataflowWorker.java:152)
        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:1073)
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: Error received from SDK harness for instruction -52327: Traceback (most recent call last):
  File "apache_beam/runners/common.py", line 883, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 498, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/gcp/bigquery.py", line 1024, in process
    schema)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/gcp/bigquery.py", line 1009, in _create_table_if_needed
    additional_create_parameters=self.additional_bq_parameters)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/utils/retry.py", line 226, in wrapper
    return fun(*args, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 712, in get_or_create_table
    found_table = self.get_table(project_id, dataset_id, table_id)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/utils/retry.py", line 226, in wrapper
    return fun(*args, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 480, in get_table
    response = self.client.tables.Get(request)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/gcp/internal/clients/bigquery/bigquery_v2_client.py", line 581, in Get
    config, request, global_params=global_params)
  File "/usr/local/lib/python3.6/site-packages/apitools/base/py/base_api.py", line 731, in _RunMethod
    return self.ProcessHttpResponse(method_config, http_response, request)
  File "/usr/local/lib/python3.6/site-packages/apitools/base/py/base_api.py", line 737, in ProcessHttpResponse
    self.__ProcessHttpResponse(method_config, http_response, request))
  File "/usr/local/lib/python3.6/site-packages/apitools/base/py/base_api.py", line 604, in __ProcessHttpResponse
    http_response, method_config=method_config, request=request)
apitools.base.py.exceptions.HttpForbiddenError: HttpError accessing <https://www.googleapis.com/bigquery/v2/projects/bought-by-many/datasets/mongo_landing_zone/tables/service_user_users_users?alt=json>: response: <{'vary': 'Origin, X-Origin, Referer', 'content-type': 'application/json; charset=UTF-8', 'date': 'Tue, 25 Feb 2020 16:49:25 GMT', 'server': 'ESF', 'cache-control': 'private', 'x-xss-protection': '0', 'x-frame-options': 'SAMEORIGIN', 'x-content-type-options': 'nosniff', 'transfer-encoding': 'chunked', 'status': '403', 'content-length': '560', '-content-encoding': 'gzip'}>, content <{
  "error": {
    "code": 403,
    "message": "Exceeded rate limits: too many api requests per user per method for this user_method. For more information, see https://cloud.google.com/bigquery/troubleshooting-errors",
    "errors": [
      {
        "message": "Exceeded rate limits: too many api requests per user per method for this user_method. For more information, see https://cloud.google.com/bigquery/troubleshooting-errors",
        "domain": "usageLimits",
        "reason": "rateLimitExceeded"
      }
    ],
    "status": "PERMISSION_DENIED"
  }
}
>

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 167, in _execute
    response = task()
  File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 223, in <lambda>
    lambda: self.create_worker().do_instruction(request), request)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 352, in do_instruction
    request.instruction_id)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 386, in process_bundle
    bundle_processor.process_bundle(instruction_id))
  File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 812, in process_bundle
    data.transform_id].process_encoded(data.data)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 205, in process_encoded
    self.output(decoded_value)
  File "apache_beam/runners/worker/operations.py", line 302, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 304, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 178, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 657, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 658, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 878, in apache_beam.runners.common.DoFnRunner.receive
  File "apache_beam/runners/common.py", line 885, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 941, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 883, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 497, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1028, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "apache_beam/runners/worker/operations.py", line 178, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 657, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 658, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 878, in apache_beam.runners.common.DoFnRunner.receive
  File "apache_beam/runners/common.py", line 885, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 941, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 883, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 497, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1028, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "apache_beam/runners/worker/operations.py", line 178, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 657, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 658, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 878, in apache_beam.runners.common.DoFnRunner.receive
  File "apache_beam/runners/common.py", line 885, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 956, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "/usr/local/lib/python3.6/site-packages/future/utils/__init__.py", line 421, in raise_with_traceback
    raise exc.with_traceback(traceback)
  File "apache_beam/runners/common.py", line 883, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 498, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/gcp/bigquery.py", line 1024, in process
    schema)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/gcp/bigquery.py", line 1009, in _create_table_if_needed
    additional_create_parameters=self.additional_bq_parameters)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/utils/retry.py", line 226, in wrapper
    return fun(*args, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 712, in get_or_create_table
    found_table = self.get_table(project_id, dataset_id, table_id)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/utils/retry.py", line 226, in wrapper
    return fun(*args, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 480, in get_table
    response = self.client.tables.Get(request)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/gcp/internal/clients/bigquery/bigquery_v2_client.py", line 581, in Get
    config, request, global_params=global_params)
  File "/usr/local/lib/python3.6/site-packages/apitools/base/py/base_api.py", line 731, in _RunMethod
    return self.ProcessHttpResponse(method_config, http_response, request)
  File "/usr/local/lib/python3.6/site-packages/apitools/base/py/base_api.py", line 737, in ProcessHttpResponse
    self.__ProcessHttpResponse(method_config, http_response, request))
  File "/usr/local/lib/python3.6/site-packages/apitools/base/py/base_api.py", line 604, in __ProcessHttpResponse
    http_response, method_config=method_config, request=request)
RuntimeError: apitools.base.py.exceptions.HttpForbiddenError: HttpError accessing <https://www.googleapis.com/bigquery/v2/projects/bought-by-many/datasets/mongo_landing_zone/tables/service_user_users_users?alt=json>: response: <{'vary': 'Origin, X-Origin, Referer', 'content-type': 'application/json; charset=UTF-8', 'date': 'Tue, 25 Feb 2020 16:49:25 GMT', 'server': 'ESF', 'cache-control': 'private', 'x-xss-protection': '0', 'x-frame-options': 'SAMEORIGIN', 'x-content-type-options': 'nosniff', 'transfer-encoding': 'chunked', 'status': '403', 'content-length': '560', '-content-encoding': 'gzip'}>, content <{
  "error": {
    "code": 403,
    "message": "Exceeded rate limits: too many api requests per user per method for this user_method. For more information, see https://cloud.google.com/bigquery/troubleshooting-errors",
    "errors": [
      {
        "message": "Exceeded rate limits: too many api requests per user per method for this user_method. For more information, see https://cloud.google.com/bigquery/troubleshooting-errors",
        "domain": "usageLimits",
        "reason": "rateLimitExceeded"
      }
    ],
    "status": "PERMISSION_DENIED"
  }
}

我使用的代码如下:

    files = (
        p
        | "read PubSub"
        >> beam.io.ReadFromPubSub(
            topic=known_args.input_topic, with_attributes=True, id_label=id_label
        )
        | "decode message" >> beam.Map(lambda pubsub_msg: json.loads(pubsub_msg.data))
        | "filter buckets with unknown encodings"
        >> beam.Filter(no_encoding_bucket_filter, encodings)
        | "get file from bucket" >> beam.ParDo(GetFileFromBucket())
    )

    policies = (
        files
        | f"filter for policies"
        >> beam.Filter(lambda msg: 'policies' in msg["bucket"])
        | f"encode policies"
        >> beam.Map(apply_encoding, encodings['policies'], 'policies')
        | f"filter out policies that failed to encode"
        >> beam.Filter(lambda item: True if item is not None else False)
        | f"insert policies to BigQuery"
        >> beam.io.WriteToBigQuery(
            project=project_id,
            table="service_policy_policies",
            dataset="mongo_landing_zone",
            insert_retry_strategy="RETRY_ON_TRANSIENT_ERROR",
        )
    )

beam.io.WriteToBigQuery() 确实适用于流数据,但从错误中我怀疑它正在初始化或获取 BigQuery 表作为每个已处理元素的对象,而不仅仅是插入一行。我是否以某种不正确的方式使用它?




2020 年 3 月 11 日更新

我设法改进,但没有解决问题。我从使用beam.io.WriteToBigQuery 切换到编写一个名为WriteToBigQueryCustom 的自定义类来做同样的事情。我仍然会收到错误,但现在只有 500/秒或更高的吞吐量。

更新代码:

class WriteToBigQueryCustom(beam.DoFn):
    """
    Stream insert records into a BigQuery table. Intended to work the same way you'd
    expect beam.io.WriteToBigQuery to work for streaming.

    Even though beam.io.WriteToBigQuery supports streaming, it seemed to be
    initialising the BigQuery connection for every element processed. Was
    getting throttled and causing errors about hitting BQ api limits at throughput of
    100 elements/sec when the streaming inserts limit is 1,000,000/sec.
    """

    def __init__(self, project_id, dataset, table_name):
        self.project_id = project_id
        self.dataset = dataset
        self.table_name = table_name
        self.table_id = f"{project_id}.{dataset}.{table_name}"

    def start_bundle(self):
        self.bq_client = bigquery.Client()
        self.table = self.bq_client.get_table(self.table_id)

    def process(self, dict_to_insert):
        """Insert a dict to the classes BigQuery table"""
        errors = self.bq_client.insert_rows(self.table, [dict_to_insert])
        if errors:
            logging.error(
                f"Hit error uploading row to bigquery table {self.table_id}: "
                f"{errors}. Was trying to insert dict: {dict_to_insert}"
            )

【问题讨论】:

  • 您可能会在 user@beam.apache.org 中提出这个问题。人们可以更好地回答为什么使用“tabledata.list”API。
  • 谢谢,我已经发邮件到那里了
  • 您是否使用用户帐户凭据(通过运行gcloud auth application-default login 在您的计算机上获得的凭据)启动数据流作业?当您以这种方式启动作业时,控制台中会显示一条消息,这不是一个好的做法,因为您可能会很快遇到配额限制。如果是,您应该使用服务帐户而不是用户帐户来开始工作。
  • 我使用的是服务帐户@MattWelke
  • @MarkM 我有一个类似的自定义类,您不认为每条消息都会调用 start_bundle 吗?我的意思是 self.bq_client = bigquery.Client() 在每条消息上都会被调用,我不确定这是否是个好主意

标签: python google-cloud-dataflow apache-beam


【解决方案1】:

在运行类似的管道时,我遇到了同样的麻烦。 Python/Beam SDK 中似乎存在某种错误。

https://issues.apache.org/jira/browse/BEAM-6831

添加 create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER 对我有帮助。

问候 迈克尔

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2019-05-01
    • 1970-01-01
    • 2019-08-15
    • 1970-01-01
    • 2022-11-26
    • 2021-12-02
    • 2019-11-11
    相关资源
    最近更新 更多