【发布时间】:2021-09-25 07:25:45
【问题描述】:
我一直在测试将 pub/sub 读取数据与自创数据连接起来。下面是主要的流水线方法。
def run(input_topic,input_subscription, output_path, window_size=1.0, num_shards=5, pipeline_args=None):
pipeline_options = PipelineOptions(pipeline_args, streaming=True, save_main_session=True)
with Pipeline(options=pipeline_options) as pipeline:
# reading from pub/sub and creating a fixed window of 1 min.
p1 = pipeline | "Read from Pub/Sub" >> io.ReadFromPubSub(subscription=input_subscription)\
| "Window into" >> GroupMessagesByFixedWindows(window_size, num_shards)
#creating sample data
p2 = pipeline | "creating a sample data" >> Create([('Hello','sh 1'),('Hello','sh 1.1'),
('Hello_world','sh 2'),
('Hello_everyone','sh 3'),
('Hello_cloud','sh 4')])
({"schdedule":p2,"timestamp":p1}) | "merging" >> CoGroupByKey()| "merge print">> Map(print)
下面是window和addtimestamp的转换方法。
class GroupMessagesByFixedWindows(PTransform):
"""A composite transform that groups Pub/Sub messages based on publish time
and outputs a list of tuples, each containing a message and its publish time.
"""
def __init__(self, window_size, num_shards=5):
# Set window size to 30 seconds.
self.window_size = int(window_size * 30)
self.num_shards = num_shards
def expand(self, pcoll):
return (
pcoll
# Bind window info to each element using element timestamp (or publish time).
| "Window into fixed intervals"
>> WindowInto(FixedWindows(self.window_size))
| "Add timestamp to windowed elements" >> ParDo(AddTimestamp())
)
class AddTimestamp(DoFn):
def process(self, element, publish_time=DoFn.TimestampParam, window=DoFn.WindowParam):
"""Processes each windowed element by extracting the message body and its
publish time into a tuple.
"""
yield (element.decode("utf-8"),datetime.utcfromtimestamp(float(publish_time)).strftime("%Y-%m-%d %H:%M:%S"))
我得到的结果如下所示。
('Hello', {'schdedule': [], 'timestamp': ['2021-07-16 13:19:00']})
('Hello_world', {'schdedule': [], 'timestamp': ['2021-07-16 13:19:00']})
('Hello_everyone', {'schdedule': [], 'timestamp': ['2021-07-16 13:19:00']})
schedule 列表打印为空,因为它没有加入。
期待
('Hello', {'schdedule': ['sh 1','sh 1.1'], 'timestamp': ['2021-07-16 13:19:00']})
('Hello_world', {'schdedule': ['sh 2'], 'timestamp': ['2021-07-16 13:19:00']})
('Hello_everyone', {'schdedule': ['sh 3'], 'timestamp': ['2021-07-16 13:19:00']})
我尝试在 p2 上单独执行 GroupByKey,效果很好,结果如下。
('Hello', ['sh 1','sh 1.1'])
('Hello_world', ['sh 2'])
('Hello_everyone', ['sh 3'])
还尝试了带有侧面输入的静态字典,它工作正常,但是一旦我这样做 CoGroupByKey 它就不会从 p2 管道产生任何结果。建议我在这里做错什么。
【问题讨论】:
-
为了了解您的两个 Pcollections 未正确合并的原因,您能否提供一些 p1 集合 的示例数据?我创建了一个示例代码来解释 CoGroupByKey 的工作原理,here。可以看到,合并是基于每个 Pcollection 的主键(元素[1])完成的,即 'Hello'、'Hello_world' 等。您用于合并的密钥是什么?我创建的代码对您有帮助吗?
-
这个例子工作得很好,可能与 window.不过不确定。
-
您能提供 p1 的示例数据吗?
-
通过打印 P1.
('Hello_world', '2021-07-19 12:08:00') ('Hello_everyone', '2021-07-19 12:08:00') ('Hello', '2021-07-19 12:08:00') -
第二个列表(p2)是否已修复?换句话说,当key是
Hello时,你会一直加'schdedule': ['sh 1','sh 1.1']吗?
标签: python google-cloud-dataflow apache-beam google-cloud-pubsub