【问题标题】:Execute a process exactly after BigQueryIO.write() operation在 BigQueryIO.write() 操作之后执行一个进程
【发布时间】:2018-03-14 14:18:09
【问题描述】:

我有一个将 BigQuery 表作为接收器的管道。在将数据写入 BigQuery 之后,我需要执行一些步骤。这些步骤包括对该表执行查询、从中读取数据并写入不同的表。

以上如何实现?我是否应该为后者创建一个不同的管道,然后在第一个管道之后调用它,这将是我认为的另一个问题。

如果以上都不起作用,是否可以从正在运行的管道调用另一个数据流作业(模板)。

确实需要一些帮助。

谢谢。

【问题讨论】:

    标签: google-cloud-dataflow apache-beam


    【解决方案1】:

    BigQueryIO 目前未明确支持此功能。唯一的解决方法是使用单独的管道:启动第一个管道,等待它完成(例如,使用pipeline.run().waitUntilFinish()),启动第二个管道(确保为其使用单独的管道对象 - 多次重用同一个对象不支持)。

    【讨论】:

    • 只是添加 - 您不一定必须使用另一个管道来实现这一点。在第一个管道完成 (pipeline.run().waitUntilFinish()) 后,您可以重新使用 BigQuery SDK。我们在管道中经常这样做,并且该模式运行良好。 stackoverflow.com/questions/44315157/…
    • @jkff 如果我正在创建模板,如何使它工作?那么我会有两个管道的单独模板吗?如果我想创建一个同时运行两个管道的模板怎么办?
    • 很遗憾,模板无法做到这一点。
    【解决方案2】:

    我一直在使用 模板 的解决方法是将 IO 操作的结果写入元数据文件到特定的存储桶中,一个云函数(即我的 orchestrator) 被触发,进而触发以下管道。但是,我仅使用 TextIO 操作对其进行了测试。 所以,在你的情况下:

    • 执行 BigQueryIO.write() 操作
    • 将其结果写入文件 (xxx-meta-file) 到 Cloud Storage 存储桶 (xxx-meta-bucket) 中,您只保留 Dataflow 结果 - 这是管道的最后一步
    • 编写一个编排器云函数来监听 xxx-meta-bucket 中创建/修改的对象(请参阅 here
    • 在编排器中,您可能需要一些条件来检查实际创建/修改了什么文件
    • 相应地触发下一个管道(直接在编排器中或通过触发另一个负责启动该特定管道的云函数将其解耦)

    很确定可以使用 PubSub 轻松复制类似的方法,而不是写入存储桶(例如,请参阅 here 了解我列表中的第二步)

    【讨论】:

      猜你喜欢
      • 2022-01-23
      • 1970-01-01
      • 1970-01-01
      • 2014-09-04
      • 2019-12-07
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多