【问题标题】:PubSub to BigQuery - Dataflow/Beam template in Python?PubSub 到 BigQuery - Python 中的数据流/Beam 模板?
【发布时间】:2021-05-23 22:12:08
【问题描述】:

是否有任何 Python 模板/脚本(现有或路线图)可供 Dataflow/Beam 从 PubSub 读取并写入 BigQuery? 根据GCP documentation,只有Java template

谢谢!

【问题讨论】:

    标签: google-cloud-platform google-bigquery google-cloud-dataflow apache-beam google-cloud-pubsub


    【解决方案1】:

    你可以在这里找到一个例子Pub/Sub to BigQuery sampletemplate

    Apache Beam 流式传输管道示例。

    它读取 JSON 编码 来自 Pub/Sub 的消息,转换消息数据,并写入 结果到 BigQuery。

    这是另一个示例,展示了如何将来自 pubsub 的无效消息处理到 Bigquery 中的不同表中:

    class ParseMessage(beam.DoFn):
        OUTPUT_ERROR_TAG = 'error'
        
        def process(self, line):
            """
            Extracts fields from json message
            :param line: pubsub message
            :return: have two outputs:
                - main: parsed data
                - error: error message
            """
            try:
                parsed_row = _ # parse json message to corresponding bgiquery table schema
                yield data_row
            except Exception as error:
                error_row = _ # build you error schema here
                yield pvalue.TaggedOutput(self.OUTPUT_ERROR_TAG, error_row)
            
    
    def run(options, input_subscription, output_table, output_error_table):
        """
        Build and run Pipeline
        :param options: pipeline options
        :param input_subscription: input PubSub subscription
        :param output_table: id of an output BigQuery table
        :param output_error_table: id of an output BigQuery table for error messages
        """
    
        with beam.Pipeline(options=options) as pipeline:
            # Read from PubSub
            rows, error_rows = \
                (pipeline | 'Read from PubSub' >> beam.io.ReadFromPubSub(subscription=input_subscription)
                 # Adapt messages from PubSub to BQ table
                 | 'Parse JSON messages' >> beam.ParDo(ParseMessage()).with_outputs(ParseMessage.OUTPUT_ERROR_TAG,
                                                                                    main='rows')
                 )
    
            _ = (rows | 'Write to BigQuery'
                 >> beam.io.WriteToBigQuery(output_table,
                                            create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER,
                                            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
                                            insert_retry_strategy=RetryStrategy.RETRY_ON_TRANSIENT_ERROR
                                            )
                 )
    
            _ = (error_rows | 'Write errors to BigQuery'
                 >> beam.io.WriteToBigQuery(output_error_table,
                                            create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER,
                                            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
                                            insert_retry_strategy=RetryStrategy.RETRY_ON_TRANSIENT_ERROR
                                            )
                 )
    
    
    if __name__ == '__main__':
        parser = argparse.ArgumentParser()
        parser.add_argument(
            '--input_subscription', required=True,
            help='Input PubSub subscription of the form "/subscriptions/<PROJECT>/<SUBSCRIPTION>".')
        parser.add_argument(
            '--output_table', required=True,
            help='Output BigQuery table for results specified as: PROJECT:DATASET.TABLE or DATASET.TABLE.')
        parser.add_argument(
            '--output_error_table', required=True,
            help='Output BigQuery table for errors specified as: PROJECT:DATASET.TABLE or DATASET.TABLE.')
        known_args, pipeline_args = parser.parse_known_args()
        pipeline_options = PipelineOptions(pipeline_args)
        pipeline_options.view_as(SetupOptions).save_main_session = True
        run(pipeline_options, known_args.input_subscription, known_args.output_table, known_args.output_error_table)
    

    【讨论】:

    • 非常感谢!既然您添加了详细信息.. 请问是否有一种内置的方式来允许模式演变(例如与 insert_retry_strategy 结合使用)?到目前为止发现的示例看起来很手动,但也可能已经过时。很高兴就此提出一个新问题。让我知道。谢谢!
    • @py-r 我对此不确定,但根据我所见,您需要使用自定义逻辑来处理此问题(例如每种模式的不同表目标?)。如果在 SO 上对此一无所知,我认为您可以将其作为另一个问题提出,也许有人已经实现了它。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-11-03
    • 2018-09-05
    • 2018-06-21
    • 2020-06-28
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多