【发布时间】:2019-07-13 09:32:10
【问题描述】:
如果有人可以帮助我理解以下问题中的代码示例,我将不胜感激。我现在正在尝试使用apache beam 2.13.0 和python3.7.3 来实现类似的东西。
Why does custom Python object cannot be used with ParDo Fn?
我知道network sockets 是不可序列化的,因为它不是在序列化之后既不能返回string 也不能返回tuple 的对象。
我不明白的是为什么你需要在__init__ 中调用super class?
class PublishFn(beam.DoFn):
def __init__(self, topic_path):
self.topic_path = topic_path
super(self.__class__, self).__init__()
def process(self, element, **kwargs):
if not hasattr(self, 'publish'):
from google.cloud import pubsub_v1
self.publisher = pubsub_v1.PublisherClient()
future = self.publisher.publish(self.topic_path, data=element.encode("utf-8"))
return future.result()
谢谢。
【问题讨论】: