【问题标题】:Write data into Google Firestore in pipeline在管道中将数据写入 Google Firestore
【发布时间】:2018-10-04 01:52:55
【问题描述】:

我想通过带有 Apache Beam 的 Dataflow runner 将数据从 Cloud BigQuery 读取到 Cloud Datastore。从 documentation 开始,Firestore 尚不受支持。我写了自己的课来做这件事。

class UpdateIntoFireStore(beam.DoFn):

    def process(self, element):
        try:
            cred = credentials.Certificate({
              "..."
            })

            firebase_admin.initialize_app(cred, {
            'projectId': '...',
            })
        except ValueError:
            pass
        db = firestore.client()
        doc_ref = db.collection(u'poi')
        doc_ref.add(element)

管道如下:

job = ( p  | 'Read from BigQuery' >> Read(BigQuerySource(query="SELECT * FROM ...", use_standard_sql=True))
           | 'Update to Firestore' >> beam.ParDo(UpdateIntoFireStore()))

这种方法好吗?我担心并行处理对 Cloud Firestore 上的这些写入操作的影响。

【问题讨论】:

    标签: google-cloud-firestore apache-beam


    【解决方案1】:

    使用start_bundle 定义您的客户。

    start_bundle - 在工作人员处理一组元素之前调用。 要处理的元素被拆分成包并分发 给工人。在工作人员在第一个元素上调用 process() 之前 它的包,它调用这个方法。

    更好的方法:

    class FirestoreDoFn(beam.DoFn):
    
    def __init__(self):
        super(FirestoreDoFn, self).__init__()
    
    def start_bundle(self):
        self.firestore_client = GoogleServices(
            crendential_path="<cred-path-in-here>"
        ).init_firestore(
            project_id="<your-project-id>",
            collection_id="<collection-id>"
        )
    
    def process(self, element, *args, **kwargs):
        logging.info(element)
        # response = self.firestore_client.save()
        # logging.info("response: {}".format(response))
        return {"status":"ok"}
    

    【讨论】:

      【解决方案2】:

      这与从数据流进行外部调用完全一样。从技术上讲,这将起作用。但是有几点需要注意。

      1. 无法保证单个元素将被处理多少次,因此您可能会在 firestore 中获得同一元素的多个条目。
      2. 您将对 Firestore 的每个元素进行单独调用,并且 Firestore 客户端/连接没有缓存。

      【讨论】:

      • 在等待支持 FirestoreIO 之前有什么改进现有代码的建议吗?谢谢
      猜你喜欢
      • 2012-01-28
      • 2015-12-14
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2023-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多