【问题标题】:Apache Beam / Dataflow pub/sub side input with pythonApache Beam / Dataflow pub/sub 端输入与 python
【发布时间】:2020-12-28 21:44:53
【问题描述】:

我是 Apache Beam 的新手,所以我在以下情况下有点挣扎:

  • 使用 Stream 模式的 Pub/Sub 主题
  • 转换取出customerId
  • 带有 Transform/ParDo 的并行 PCollection,它根据 Pub/Sub 主题中收到的“customerId”从 Firestore 获取数据(使用侧输入)
  • ...

尝试获取 Firestore 数据的 ParDo 转换根本不运行。如果使用“customerId”固定值,一切都按预期工作......虽然没有使用从 Firestore 正确获取(简单的 ParDo),但它可以工作。我在做不应该做的事情吗? 包括我的代码如下:

class getFirestoreUsers(beam.DoFn):
    def process(self, element, customerId):

        print(f'Getting Users from Firestore, ID: {customerId}')

        # Call function to initialize Database
        db = intializeFirebase()

        """ # get customer information from the database
        doc = db.document(f'Customers/{customerId}').get()
        customer = doc.to_dict() """
        usersList = {}

        # Get Optin Users
        try:
            docs = db.collection(
                f'Customers/{customerId}/DevicesWiFi_v3').where(u'optIn', u'==', True).stream()
            usersList = {user.id: user.to_dict() for user in docs}
        except Exception as err:
            print(f"Error: couldn't retrieve OPTIN users from DevicesWiFi")
            print(err)

        return([usersList])

主要代码

def run(argv=None):
    """Build and run the pipeline."""
    parser = argparse.ArgumentParser()
    parser.add_argument(
        '--topic',
        type=str,
        help='Pub/Sub topic to read from')
    parser.add_argument(
        '--output',
        help=('Output local filename'))

    args, pipeline_args = parser.parse_known_args(argv)

    options = PipelineOptions(pipeline_args)
    options.view_as(SetupOptions).save_main_session = True
    options.view_as(StandardOptions).streaming = True

    p = beam.Pipeline(options=options)

    users = (p | 'Create chars' >> beam.Create([
        {
             "clientMac": "7c:d9:5c:b8:6f:38",
             "username": "Louis"
             },
        {
            "clientMac": "48:fd:8e:b0:6f:38",
            "username": "Paul"
        }
    ]))


    # Get Dictionary from Pub/Sub
    data = (p | 'Read from PubSub' >> beam.io.ReadFromPubSub(topic=args.topic)
            | 'Parse JSON to Dict' >> beam.Map(lambda e: json.loads(e))
            )

    # Get customerId from Pub/Sub information
    PcustomerId = (data | 'get customerId from Firestore' >>
                   beam.ParDo(lambda x: [x.get('customerId')]))
    PcustomerId | 'print customerId' >> beam.Map(print)

    # Get Users from Firestore
    custUsers = (users | 'Read from Firestore' >> beam.ParDo(
        getFirestoreUsers(), customerId=beam.pvalue.AsSingleton(PcustomerId)))
    custUsers | 'print Users from Firestore' >> beam.Map(print)

为了避免运行该函数时出错,我必须初始化“用户”字典,之后我完全忽略了它。 我想我这里有几个错误,非常感谢您的帮助。

【问题讨论】:

  • 我假设 PcustomerId | 'print customerId' >> beam.Map(print) 将导致一个空的 pcollection 并且您从 Firestore 读取的侧面输入可能会永远等待侧面输入。我很好奇为什么将 customerId 作为侧面输入而不是主要输入输入到您的“从 Firestore 中读取”中。请注意,如果您在没有显式窗口的情况下使用流模式,则默认值为全局窗口,除非您有适当的触发器,否则它不会产生任何输出。
  • 您好,易驰,感谢您的回复。我对这个问题做了很多不同的方法,以至于我不记得确切的输出了。我面临的问题是,由于 Beam 的并行处理,当使用来自 Pub/Sub 的原始 JSON 数据运行“主”PCollection 时,所需的数据(首先是客户 ID,然后是客户数据)尚未准备好。我需要一个“等待”选项,我认为它在 python 中尚不可用......还有其他方法吗?

标签: python-3.x google-cloud-firestore apache-beam dataflow


【解决方案1】:

我不清楚示例代码中如何使用usersPCollection(因为element 未在process 定义中处理)。我用窗口重新排列了代码,并使用customer_id 作为主要输入。

class GetFirestoreUsers(beam.DoFn):
  def setup(self):
    # Call function to initialize Database
    self.db = intializeFirebase()

  def process(self, element):
    print(f'Getting Users from Firestore, ID: {element}')

    """ # get customer information from the database
    doc = self.db.document(f'Customers/{element}').get()
    customer = doc.to_dict() """
    usersList = {}

    # Get Optin Users
    try:
        docs = self.db.collection(
            f'Customers/{element}/DevicesWiFi_v3').where(u'optIn', u'==', True).stream()
        usersList = {user.id: user.to_dict() for user in docs}
    except Exception as err:
        print(f"Error: couldn't retrieve OPTIN users from DevicesWiFi")
        print(err)

    return([usersList])



data = (p | 'Read from PubSub' >> beam.io.ReadFromPubSub(topic=args.topic)
          | beam.WindowInto(window.FixedWindow(60))
          | 'Parse JSON to Dict' >> beam.Map(lambda e: json.loads(e)))

# Get customerId from Pub/Sub information
customer_id = (data | 'get customerId from Firestore' >>
               beam.Map(lambda x: x.get('customerId')))
customer_id | 'print customerId' >> beam.Map(print)

# Get Users from Firestore
custUsers = (cutomer_id | 'Read from Firestore' >> beam.ParDo(
    GetFirestoreUsers())
custUsers | 'print Users from Firestore' >> beam.Map(print)

来自您的评论:

在使用来自 Pub/Sub 的原始 JSON 数据运行“主”PCollection 时,所需的数据(首先是客户 ID,然后是客户数据)尚未准备好

您的意思是阅读 Pub/Sub 主题时,firestore 中的数据还没有准备好?

您始终可以在主函数中将逻辑拆分为 2 个管道,然后依次运行它们。

【讨论】:

    猜你喜欢
    • 2020-06-22
    • 2018-08-24
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-04-26
    • 2017-10-15
    • 2018-05-11
    • 2019-07-23
    相关资源
    最近更新 更多