【发布时间】:2021-09-28 11:53:27
【问题描述】:
我正在尝试从 google pub-sub 中的某个主题获取所有可用消息。 但是在 go 中,一旦 Pub-Sub 中没有剩余消息,我无法找到可以取消接收回调的配置。
我认为一种方法是使用此答案Google PubSub - Counting messages in topic 中描述的 Google Cloud Monitoring Api 从 Pub-Sub 获取消息总数,然后记录已读取消息的数量并在计数为时调用取消等于这个数字,但我不确定这是否是前进的正确方法。
var mu sync.Mutex
received := 0
sub := client.Subscription(subID)
cctx, cancel := context.WithCancel(ctx)
err = sub.Receive(cctx, func(ctx context.Context, msg *pubsub.Message) {
mu.Lock()
defer mu.Unlock()
fmt.Fprintf(w, "Got message: %q\n", string(msg.Data))
msg.Ack()
received++
if received == TotalNumberOfMessages {
cancel()
}
})
if err != nil {
return fmt.Errorf("Receive: %v", err)
}
我也尝试过使用带超时的上下文,即在取消之后获取直到未满足此上下文截止日期。
ctx, cancel := context.WithTimeout(ctx, 100*time.Second)
defer cancel()
err = subscription.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
}
但话又说回来,我无法确定所有消息都已处理完毕。
请提出一个解决方案,以确保订阅功能在 Pub-Sub 中没有剩余消息时停止。
【问题讨论】: