【问题标题】:Add tags per element in PCollection在 PCollection 中为每个元素添加标签
【发布时间】:2018-08-26 23:50:11
【问题描述】:

好的,所以几周来我一直在与 Apache Beam 搏斗。我正在学习,但一直停留在看似微不足道的事情上。我在两个单独的 CSV 文件中有大约 6000 万行数据。这些行由整数和浮点数组成。我会问我的问题,但我认为提供背景信息来说明这是一个更大的过程的一部分会有所帮助,我稍后会解释。

读入后,我的 PCollection 中的每一行如下所示: '11139422、11139421、11139487、11139449、11139477、27500、60.75、60.75、60.75'

我首先将其转换为如下所示: '11139422'、'11139421'、'11139487'、'11139449'、'11139477'、'27500'、'60.75'、'60.75'、'60.75'

然后我想创建将每个值转换为元组对,以便我可以添加值。例如,我希望 PCollection 中的每一行看起来像这样: (p1, 11139422), (p2, 11139421), (p3, 11139487), (p4, 11139449), (p5, 11139477), (sal, 27500), (fp, 60.75), (bp, 60.75), (pp , 60.75)

如果我了解如何让并行处理有效地执行,我认为我应该将每一行转换为具有某种类型的散列键的字典: some_hashed_key: (从上面收集的 tupledtag 值)。我还没有在下一步做任何工作,因为我目前被困在这里。我的下一步是基本上在两个 PCollection 之间执行一个 cartesion 产品。两者的格式几乎与上面完全相同。我的计划是将左侧 PCollection 中的每个字典键广播到右侧 PCollection 上的每个字典键,在 PCollection 之间添加一些值,然后将其全部展平到一个 PCollection 中并发送到 Pub/Sub 队列。同样,我只是提供上下文,而不是要求任何人为我编写代码,谢谢!

【问题讨论】:

    标签: python google-cloud-dataflow


    【解决方案1】:

    太幸运了!我在这里找到了答案:

    How to convert csv into a dictionary in apache beam dataflow

    所以,不需要任何转换,它是一个内置函数,可以在读入时自动附加键:值对。希望有人偶然发现这篇文章,它会让他们的一天和我的一样好!

    【讨论】: