【问题标题】:Google Cloud PubSub messages not processed by callback回调未处理 Google Cloud PubSub 消息
【发布时间】:2019-02-10 17:37:20
【问题描述】:

我正在尝试使用 Google PubSub 在两个服务之间传递和接收消息。但是,发送的一些消息似乎是随机丢弃的,并没有被订阅者的回调方法处理。

发送消息时,大约有一半的消息是由回调方法处理的。对于另一半,似乎根本没有调用回调方法(未记录任何信息)。但是,消息仍然从主题中消失,并且不会重新发送。

用于启动订阅者的代码:

logger = logging.getLogger(LOGGER_NAME)
logger.info('Starting the pubsub subscriber')
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(GOOGLE_CLOUD_PROJECT, SUBSCRIPTION_NAME)
subscriber.subscribe(subscription_path, callback=callback)
while True:
    try:
        sleep(60)
    except Exception as e:
        // Log exception

回调方法:

def callback(message):
    logger = logging.getLogger(LOGGER_NAME)
    logger.info(f'Recieved callback with message: {message}', extra = {'callback_message': message}  )
    // Process message

错误似乎出现在订阅者方面。消息从发布者发送,如果订阅者未连接到主题,消息不会消失。

我尝试使用流控制来控制订阅者检索到的消息数量,但它似乎没有任何效果。

可以在不调用回调方法的情况下处理消息吗?消息可能会从主题中消失还有其他一些原因吗?

编辑:原来另一个服务正在从同一个订阅中读取,处理丢失的消息。

【问题讨论】:

  • 你能分享用于创建发布者和发布消息的代码吗?

标签: python google-cloud-platform google-cloud-pubsub


【解决方案1】:

我知道您找到了问题的答案,但我认为列出一些有用的步骤来调试此类问题是值得的:

  1. 检查以确保消息确实已发布。当发布成功时,响应应该包含消息的 ID,例如,作为由 APIFuture 在Java Publish 方法中产生的字符串。
  2. 检查是否有积压的消息。您可以通过Stackdriver查看subscription/oldest_unacked_message_agesubscription/num_undelivered_messages
  3. 检查您的订阅者是否设置了flow control,这会阻止您及时接收所有消息。如果您设置了流量控制并且阻止了所有消息的传递,您可能会看到 Stackdriver 中未传递的消息数量正在增加。
  4. 确保没有任何其他客户端订阅同一订阅的消息。例如,也许您正在使用gcloud tool to pull 并查看消息。在这种情况下,您可能不会在 Stackdriver 中看到未送达消息的数量增加。

如果在检查完所有这些后您不确定您的消息发生了什么,最好联系支持人员并提供您的项目和订阅的名称以及您认为未送达的任何消息的 ID。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2019-07-02
    • 1970-01-01
    • 2019-07-31
    • 2015-12-14
    • 2015-11-10
    • 2021-05-18
    • 2021-12-30
    • 2019-02-06
    相关资源
    最近更新 更多