【问题标题】:Streaming Pipeline in Dataflow to Bigtable Python将 Dataflow 中的流式管道流式传输到 Bigtable Python
【发布时间】:2020-11-12 02:07:25
【问题描述】:

我想阅读 pubsub 主题并使用 Python 编写的数据流代码将数据写入 BigTable。我可以在 JAVA 中找到示例代码,但在 Python 中找不到。 我们如何将pubsub的一行中的列分配到不同的列族并将数据写入Bigtable?

【问题讨论】:

  • 嗨@Priya Agarwal。如果以下任何答案解决了您的问题,请单击复选标记考虑accepting it。这向更广泛的社区表明您已经找到了解决方案,并为回答者和您自己提供了一些声誉。没有义务这样做。如果给出的答案仍然不能解决您的问题,请告诉我们。谢谢。

标签: python-3.x google-cloud-dataflow google-cloud-bigtable


【解决方案1】:

要在 Dataflow 管道中写入 Bigtable,您需要创建直接行并将它们传递给 WriteToBigTable doFn。这是一个简单的例子,它只是传入行键并为每个键添加一个单元格,没什么太花哨的:

import datetime
import apache_beam as beam

from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.gcp.bigtableio import WriteToBigTable
from google.cloud.bigtable import row


class MyOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_argument(
            '--bigtable-project',
            help='The Bigtable project ID, this can be different than your '
                 'Dataflow project',
            default='bigtable-project')
        parser.add_argument(
            '--bigtable-instance',
            help='The Bigtable instance ID',
            default='bigtable-instance')
        parser.add_argument(
            '--bigtable-table',
            help='The Bigtable table ID in the instance.',
            default='bigtable-table')


class CreateRowFn(beam.DoFn):
    def process(self, key):
        direct_row = row.DirectRow(row_key=key)
        direct_row.set_cell(
            "stats_summary",
            b"os_build",
            b"android",
            datetime.datetime.now())
        return [direct_row]


def run(argv=None):
    """Build and run the pipeline."""
    options = MyOptions(argv)
    with beam.Pipeline(options=options) as p:
        p | beam.Create(["phone#4c410523#20190501",
                         "phone#4c410523#20190502"]) | beam.ParDo(
            CreateRowFn()) | WriteToBigTable(
            project_id=options.bigtable_project,
            instance_id=options.bigtable_instance,
            table_id=options.bigtable_table)


if __name__ == '__main__':
    run()

我现在才刚刚开始探索,一旦完成,我可以链接到 GitHub 上更完善的版本。希望这可以帮助您入门。

【讨论】:

  • 我希望数据流管道从 pubsub 读取并写入 bigtable。您提供的代码不是从 pubsub 读取的,也不是数据流代码。您直接使用 Python 客户端库向 Bigtable 插入数据。
  • 嘿 Priya,抱歉耽搁了,但我终于开始尝试这个并整理了一些应该有帮助的代码。让我知道这是否有效或您有其他问题。
  • 谢谢比利。我会看一下并发布更新。
  • 如果这对您有帮助,请标记为已接受,如果有问题请告诉我,谢谢!
  • 抱歉让您失望了,Java 可能会更好。这是在排队等待添加到官方文档中,然后可以测试和维护代码示例,以确保您将来不必面对此类问题。
【解决方案2】:

在建议的基础上添加 PubSub,这是一个工作版本..

先决条件

  • 已创建 GCS 存储桶(用于 Dataflow 临时/暂存文件)
  • PubSub 主题已创建
  • PubSub 订阅已创建
  • BigTable 实例已创建
  • BigTable 表已创建
  • BigTable 必须创建列族(否则没有可见错误!)

cbt为后者的示例:

cbt -instance test-instance createfamily test-table cf1

代码

定义并运行数据流管道。

# Packages

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.gcp.bigtableio import WriteToBigTable
from google.cloud import pubsub_v1

# Classes
 
class CreateRowFn(beam.DoFn):

    def __init__(self, pipeline_options):
        
        self.instance_id = pipeline_options.bigtable_instance
        self.table_id = pipeline_options.bigtable_table
  
    def process(self, key):
        
        from google.cloud.bigtable import row
        import datetime

        direct_row = row.DirectRow(row_key=key)
        direct_row.set_cell(
            'cf1',
            'field1',
            'value1',
            timestamp=datetime.datetime.now())
        
        yield direct_row

# Options

class XyzOptions(PipelineOptions):
    
    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_argument('--bigtable_project', default='nested'),
        parser.add_argument('--bigtable_instance', default='instance'),
        parser.add_argument('--bigtable_table', default='table')

pipeline_options = XyzOptions(
    save_main_session=True, streaming=True,
    runner='DataflowRunner',
    project=PROJECT,
    region=REGION,
    temp_location=TEMP_LOCATION,
    staging_location=STAGING_LOCATION,
    requirements_file=REQUIREMENTS_FILE,
    bigtable_project=PROJECT,
    bigtable_instance=INSTANCE,
    bigtable_table=TABLE)

# Pipeline

def run (argv=None):
    
    with beam.Pipeline(options=pipeline_options) as p:
       
       input_subscription=f"projects/{PROJECT}/subscriptions/{SUBSCRIPTION}"

        _ = (p
                | 'Read from Pub/Sub' >> beam.io.ReadFromPubSub(subscription=input_subscription).with_output_types(bytes)
                | 'Conversion UTF-8 bytes to string' >> beam.Map(lambda msg: msg.decode('utf-8'))
                | 'Conversion string to row object' >> beam.ParDo(CreateRowFn(pipeline_options)) 
                | 'Writing row object to BigTable' >> WriteToBigTable(project_id=pipeline_options.bigtable_project,
                                  instance_id=pipeline_options.bigtable_instance,
                                  table_id=pipeline_options.bigtable_table))

if __name__ == '__main__':
    run()

向 PubSub 主题发布消息 b"phone#1111"(例如,使用 Python PublisherClient())。

表格内容(使用happybase)

b'phone#1111': {b'cf1:field1': b'value1'}
Row length: 1

【讨论】:

  • 在我的管道中,我有一个 CreateRowFn 函数,它和你的函数一样,但是在 row.set_cell(...) 方法上设置单元格数据时,我不断收到以下错误:_pickle.PicklingError: Can't pickle <class 'Mutation'>: attribute lookup Mutation on __main__ failed [while running 'Transform to BigTable row']
  • 实际上,当从CreateRowFn 函数yielding DirectRow 对象时,我似乎遇到了上述错误。
  • @James B:你找到解决问题的方法了吗?
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2019-05-03
  • 1970-01-01
  • 2018-06-21
  • 1970-01-01
  • 2012-02-08
相关资源
最近更新 更多