【发布时间】:2019-02-10 17:37:20
【问题描述】:
我正在尝试使用 Google PubSub 在两个服务之间传递和接收消息。但是,发送的一些消息似乎是随机丢弃的,并没有被订阅者的回调方法处理。
发送消息时,大约有一半的消息是由回调方法处理的。对于另一半,似乎根本没有调用回调方法(未记录任何信息)。但是,消息仍然从主题中消失,并且不会重新发送。
用于启动订阅者的代码:
logger = logging.getLogger(LOGGER_NAME)
logger.info('Starting the pubsub subscriber')
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(GOOGLE_CLOUD_PROJECT, SUBSCRIPTION_NAME)
subscriber.subscribe(subscription_path, callback=callback)
while True:
try:
sleep(60)
except Exception as e:
// Log exception
回调方法:
def callback(message):
logger = logging.getLogger(LOGGER_NAME)
logger.info(f'Recieved callback with message: {message}', extra = {'callback_message': message} )
// Process message
错误似乎出现在订阅者方面。消息从发布者发送,如果订阅者未连接到主题,消息不会消失。
我尝试使用流控制来控制订阅者检索到的消息数量,但它似乎没有任何效果。
可以在不调用回调方法的情况下处理消息吗?消息可能会从主题中消失还有其他一些原因吗?
编辑:原来另一个服务正在从同一个订阅中读取,处理丢失的消息。
【问题讨论】:
-
你能分享用于创建发布者和发布消息的代码吗?
标签: python google-cloud-platform google-cloud-pubsub