【发布时间】:2020-12-06 21:25:52
【问题描述】:
我在 Google Cloud Platform 中设置了一些数据处理工作流程。这些位置处理物理地址并返回一些关于它们的指标。工作流使用 Cloud Functions 和 PubSub 流的组合。
在工作流中使用一个 Google Cloud 函数时,某些消息不会从触发流中提取或被多次提取。我知道这在一定程度上是可以预料的。但是,这种情况经常发生。这足以导致某些地点被夸大了 10 倍,而其他一些地点却没有结果。
我认为callback 函数没有正确确认消息,但我不确定应该有什么不同才能更可靠地接收和确认消息。任何建议表示赞赏。
用于检索指标的我的 GCP 云函数由 PubSub 流触发,并执行 retrieve_location 函数将数据发送到不同的 PubSub 流。 retrieve_location 函数如下所示:
def retrieve_location(event, context):
auth_flow()
project_id = <my project id>
subscription_name = <my subscription name>
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
project_id, subscription_name)
def callback(message):
message.ack()
message_obj = message.data
message_dcde = message_obj.decode('utf-8')
message_json = json.loads(message_dcde)
get_metrics(message_json)
subscriber.subscribe(subscription_path, callback=callback)
get_metrics 函数从每条消息中获取有效负载,检索一些数据并将其发送到另一个流。此功能似乎按预期工作。
def get_metrics(loc):
<... retrieve and process data, my_data is the object that gets sent to the next stream ...>
project_id = <my project id>
topic_name = <my topic name>
topic_id = <my topic id>
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_name)
try:
publisher.publish(topic_path, data=my_data.encode('utf-8'))
except Exception as exc:
print("topic publish failed: ", exc)
【问题讨论】:
-
那么您的 Cloud Function 是否由 Pub/Sub 消息触发,导致它在不同的 Pub/Sub 订阅上启动订阅者?以这种方式启动 Pub/Sub 订阅者有点不寻常。
-
@KamalAboul-Hosn 每个 PubSub 流的订阅和主题 ID 都是唯一的,因此不应该交叉。但是开始订阅的最佳方式是什么?
-
我同意这是一个不寻常的模式。您能概括地解释一下为什么需要这样做吗?
-
@DustinIngram 从 PubSub 流中触发函数以便拾取和处理所有消息的最佳方法是什么?我正在处理每个地址并检索数据。
-
只需将函数订阅到主题就足够了。我仍然不清楚为什么您需要在函数中添加第二个订阅者?
标签: python google-cloud-platform google-cloud-functions google-cloud-pubsub