【发布时间】:2020-02-05 22:40:08
【问题描述】:
我将 Pub/Sub 订阅逻辑包装在订阅方法中,该方法在每个订阅的服务初始化期间调用一次:
def subscribe(self,
callback: typing.Callable,
subscription_name: str,
topic_name: str,
project_name: str = None) -> typing.Optional[SubscriberClient]:
"""Subscribes to Pub/Sub topic and return subscriber client
:param callback: subscription callback method
:param subscription_name: name of the subscription
:param topic_name: name of the topic
:param project_name: optional project name. Uses default project if not set
:return: subscriber client or None if testing
"""
project = project_name if project_name else self.pubsub_project_id
self.logger.info('Subscribing to project `{}`, topic `{}`'.format(project, topic_name))
project_path = self.pubsub_subscriber.project_path(project)
topic_path = self.pubsub_subscriber.topic_path(project, topic_name)
subscription_path = self.pubsub_subscriber.subscription_path(project, subscription_name)
# check if there is an existing subscription, if not, create it
if subscription_path not in [s.name for s in self.pubsub_subscriber.list_subscriptions(project_path)]:
self.logger.info('Creating new subscription `{}`, topic `{}`'.format(subscription_name, topic_name))
self.pubsub_subscriber.create_subscription(subscription_path, topic_path)
# subscribe to the topic
self.pubsub_subscriber.subscribe(
subscription_path, callback=callback,
scheduler=self.thread_scheduler
)
return self.pubsub_subscriber
这个方法是这样调用的:
self.subscribe_client = self.subscribe(
callback=self.pubsub_callback,
subscription_name='subscription_topic',
topic_name='topic'
)
回调方法做了很多事情,发送 2 封电子邮件然后确认消息
def pubsub_callback(self, data: gcloud_pubsub_subscriber.Message):
self.logger.debug('Processing pub sub message')
try:
self.do_something_with_message(data)
self.logger.debug('Acknowledging the message')
data.ack()
self.logger.debug('Acknowledged')
return
except:
self.logger.warning({
"message": "Failed to process Pub/Sub message",
"request_size": data.size,
"data": data.data
}, exc_info=True)
self.logger.debug('Acknowledging the message 2')
data.ack()
当我向订阅运行推送某些内容时,回调运行,打印所有调试消息,包括Acknowledged。但是,消息保留在 Pub/Sub 中,回调被再次调用,并且每次重试后都需要指数级的时间。问题是,即使在调用 ack 之后,什么可能导致消息保留在 pub/sub 中?
我有几个这样的订阅,它们都按预期工作。截止日期不是一个选项,回调几乎立即完成,无论如何我都玩了确认截止日期,没有任何帮助。
当我尝试从连接到该 pub-sub 的本地运行的应用程序处理这些消息时,它完成得很好,并且确认按预期将消息从队列中取出。
- 所以问题只出现在已部署的服务中(在 kubernetes pod 中运行)
- 回调执行 buck ack 似乎什么都没做
- 从本地运行的脚本(...并执行完全相同的操作)或通过 GCP UI 确认消息按预期工作。
有什么想法吗?
【问题讨论】:
标签: python-3.x google-cloud-platform publish-subscribe google-cloud-pubsub