【发布时间】:2020-12-28 21:44:53
【问题描述】:
我是 Apache Beam 的新手,所以我在以下情况下有点挣扎:
- 使用 Stream 模式的 Pub/Sub 主题
- 转换取出customerId
- 带有 Transform/ParDo 的并行 PCollection,它根据 Pub/Sub 主题中收到的“customerId”从 Firestore 获取数据(使用侧输入)
- ...
尝试获取 Firestore 数据的 ParDo 转换根本不运行。如果使用“customerId”固定值,一切都按预期工作......虽然没有使用从 Firestore 正确获取(简单的 ParDo),但它可以工作。我在做不应该做的事情吗? 包括我的代码如下:
class getFirestoreUsers(beam.DoFn):
def process(self, element, customerId):
print(f'Getting Users from Firestore, ID: {customerId}')
# Call function to initialize Database
db = intializeFirebase()
""" # get customer information from the database
doc = db.document(f'Customers/{customerId}').get()
customer = doc.to_dict() """
usersList = {}
# Get Optin Users
try:
docs = db.collection(
f'Customers/{customerId}/DevicesWiFi_v3').where(u'optIn', u'==', True).stream()
usersList = {user.id: user.to_dict() for user in docs}
except Exception as err:
print(f"Error: couldn't retrieve OPTIN users from DevicesWiFi")
print(err)
return([usersList])
主要代码
def run(argv=None):
"""Build and run the pipeline."""
parser = argparse.ArgumentParser()
parser.add_argument(
'--topic',
type=str,
help='Pub/Sub topic to read from')
parser.add_argument(
'--output',
help=('Output local filename'))
args, pipeline_args = parser.parse_known_args(argv)
options = PipelineOptions(pipeline_args)
options.view_as(SetupOptions).save_main_session = True
options.view_as(StandardOptions).streaming = True
p = beam.Pipeline(options=options)
users = (p | 'Create chars' >> beam.Create([
{
"clientMac": "7c:d9:5c:b8:6f:38",
"username": "Louis"
},
{
"clientMac": "48:fd:8e:b0:6f:38",
"username": "Paul"
}
]))
# Get Dictionary from Pub/Sub
data = (p | 'Read from PubSub' >> beam.io.ReadFromPubSub(topic=args.topic)
| 'Parse JSON to Dict' >> beam.Map(lambda e: json.loads(e))
)
# Get customerId from Pub/Sub information
PcustomerId = (data | 'get customerId from Firestore' >>
beam.ParDo(lambda x: [x.get('customerId')]))
PcustomerId | 'print customerId' >> beam.Map(print)
# Get Users from Firestore
custUsers = (users | 'Read from Firestore' >> beam.ParDo(
getFirestoreUsers(), customerId=beam.pvalue.AsSingleton(PcustomerId)))
custUsers | 'print Users from Firestore' >> beam.Map(print)
为了避免运行该函数时出错,我必须初始化“用户”字典,之后我完全忽略了它。 我想我这里有几个错误,非常感谢您的帮助。
【问题讨论】:
-
我假设 PcustomerId | 'print customerId' >> beam.Map(print) 将导致一个空的 pcollection 并且您从 Firestore 读取的侧面输入可能会永远等待侧面输入。我很好奇为什么将 customerId 作为侧面输入而不是主要输入输入到您的“从 Firestore 中读取”中。请注意,如果您在没有显式窗口的情况下使用流模式,则默认值为全局窗口,除非您有适当的触发器,否则它不会产生任何输出。
-
您好,易驰,感谢您的回复。我对这个问题做了很多不同的方法,以至于我不记得确切的输出了。我面临的问题是,由于 Beam 的并行处理,当使用来自 Pub/Sub 的原始 JSON 数据运行“主”PCollection 时,所需的数据(首先是客户 ID,然后是客户数据)尚未准备好。我需要一个“等待”选项,我认为它在 python 中尚不可用......还有其他方法吗?
标签: python-3.x google-cloud-firestore apache-beam dataflow