【发布时间】:2019-05-22 17:47:12
【问题描述】:
从 pub/sub 等无限源读取数据后,我正在应用窗口化。我需要将属于一个窗口的所有记录写入一个单独的文件。我在 Java 中找到了this,但在 python 中找不到任何东西。
【问题讨论】:
标签: google-cloud-platform google-cloud-dataflow apache-beam
从 pub/sub 等无限源读取数据后,我正在应用窗口化。我需要将属于一个窗口的所有记录写入一个单独的文件。我在 Java 中找到了this,但在 python 中找不到任何东西。
【问题讨论】:
标签: google-cloud-platform google-cloud-dataflow apache-beam
问题中没有关于您的用例的详细信息,因此您可能需要调整以下示例的某些部分。一种方法是使用元素所属的窗口作为键对元素进行分组。然后,我们利用filesystems.FileSystems.create 来控制我们要如何写入文件。
在这里,我将使用 10 秒的窗口和一些虚拟数据,其中每个事件间隔 4 秒。生成:
data = [{'event': '{}'.format(event), 'timestamp': time.time() + 4*event} for event in range(10)]
我们使用timestamp 字段来分配元素时间戳(这只是为了以受控方式模拟 Pub/Sub 事件)。我们将事件窗口化,以窗口化信息为key,按key分组,将结果写入output文件夹:
events = (p
| 'Create Events' >> beam.Create(data) \
| 'Add Timestamps' >> beam.Map(lambda x: beam.window.TimestampedValue(x, x['timestamp'])) \
| 'Add Windows' >> beam.WindowInto(window.FixedWindows(10)) \
| 'Add Window Info' >> beam.ParDo(AddWindowingInfoFn()) \
| 'Group By Window' >> beam.GroupByKey() \
| 'Windowed Writes' >> beam.ParDo(WindowedWritesFn('output/')))
AddWindowingInfoFn 非常简单:
class AddWindowingInfoFn(beam.DoFn):
"""output tuple of window(key) + element(value)"""
def process(self, element, window=beam.DoFn.WindowParam):
yield (window, element)
并且WindowedWritesFn 写入我们在管道中指定的路径(在我的情况下为output/ 文件夹)。然后,我使用窗口信息作为文件名。为方便起见,我将纪元时间戳转换为人类可读的日期。最后,我们遍历所有元素并将它们写入相应的文件。当然,这个行为可以在这个函数中随意调整:
class WindowedWritesFn(beam.DoFn):
"""write one file per window/key"""
def __init__(self, outdir):
self.outdir = outdir
def process(self, element):
(window, elements) = element
window_start = str(window.start.to_utc_datetime()).replace(" ", "_")
window_end = str(window.end.to_utc_datetime()).replace(" ", "_")
writer = filesystems.FileSystems.create(self.outdir + window_start + ',' + window_end + '.txt')
for row in elements:
writer.write(str(row)+ "\n")
writer.close()
这会将属于每个窗口的元素写入不同的文件。就我而言,有 5 种不同的
$ ls output/
2019-05-21_19:01:20,2019-05-21_19:01:30.txt
2019-05-21_19:01:30,2019-05-21_19:01:40.txt
2019-05-21_19:01:40,2019-05-21_19:01:50.txt
2019-05-21_19:01:50,2019-05-21_19:02:00.txt
2019-05-21_19:02:00,2019-05-21_19:02:10.txt
第一个只包含元素 0(这会因执行而异):
$ cat output/2019-05-21_19\:01\:20\,2019-05-21_19\:01\:30.txt
{'timestamp': 1558465286.933727, 'event': '0'}
第二个包含元素 1 到 3,依此类推:
$ cat output/2019-05-21_19\:01\:30\,2019-05-21_19\:01\:40.txt
{'timestamp': 1558465290.933728, 'event': '1'}
{'timestamp': 1558465294.933728, 'event': '2'}
{'timestamp': 1558465298.933729, 'event': '3'}
这种方法的警告是来自同一个窗口的所有元素都被分组到同一个工作器中。如果根据您的情况写入单个分片或输出文件,无论如何都会发生这种情况,但是对于更高的负载,您可能需要考虑更大的机器类型。
完整代码here
【讨论】: