【问题标题】:Delayed Acknowledging GCloud Pub/Sub message延迟确认 GCloud Pub/Sub 消息
【发布时间】:2021-04-18 11:36:36
【问题描述】:

我正在尝试实现一个类似于 AWS 或 Azure 队列的 PubSub 客户端,但是,我遇到了 gcloud cpp sdk 的问题。

更新:删除了不必要的细节。

首先,提供的example 不能开箱即用——我必须在session.cancel() 之前休眠,否则无法确认消息。是否有可靠的方法等到 ack() 操作完成并检查其状态?至少我想确保服务器收到我的请求。

此外,c++ API 似乎只提供了一个不适合我的用例的异步方法。

我需要实现以下接口,该接口通过依赖注入插入到更大的系统中。该系统在其他云上的生产中工作,因此我无法更改架构。只需要实现接口即可。

template<typename TItem>
class Queue{
public:
    /*!
    * Dequeues message from the queue 
    * Returns true on success
    */
    virtual bool Dequeue( TItem & item) = 0;

    /*!
    * Discards(deletes) the item with from the cloud queue.
    */
    virtual void Discard(const TReceipt & receipt) = 0;
};

队列的实际实现将提供一个序列化器,该序列化器将 TItem 序列化为 JSON 并返回。

AWS 和 Azure SDK 为每个出队的消息提供一个收据,以便我以后可以丢弃它。 pubsub SDK 的收据是 AckHandler 对象,它绑定到会话。

一个明显错误的解决方案是保持会话打开并在 lambda 中等待另一个 condition_variable,直到下一次调用 Dequeue 方法。但是,这看起来像是一个快速而肮脏的解决方案。 使用 Pub/Sub 实现此功能的正确方法是什么?

【问题讨论】:

    标签: c++11 google-cloud-platform google-cloud-pubsub


    【解决方案1】:

    是否有可靠的方法等到 ack() 操作完成并检查其状态?

    并非如此,因为ack() 操作是尽力而为。即使您等待ack() 到达服务器,也不能保证消息不会被重新发送。是的,即使ack() 成功,服务也可能会重新发送消息。

    请注意,库会自动延长您消息的租期以避免重新发送,直到您明确 ack()nack(),因此 ack() 中的延迟不会影响正确性除非您关闭在ack()之后不久申请。

    AWS 和 Azure SDK 为每个出队的消息提供一个收据,以便我以后可以丢弃它,但我不知道如何使用 pub/sub 来做到这一点。

    您可以在AckHandler 上调用nack() 来丢弃消息,这意味着该消息将被重新发送到另一个实例。这就是你所说的“丢弃”吗?

    使用 Pub/Sub 实现此功能的正确方法是什么?

    嗯,我不确定我是否遵循。我可以猜测Dequeue()Discard() 的语义,如果我猜错了请见谅。无论如何,尚不清楚这是否真的适用于任何类型?喜欢TReceipt == int

    需要注意的是这里有很多猜测,你可以这样做:

    class PubsubBuffer {
     private:
      std::mutex mu_;
      std::dequeue<std::pair<pubsub::Message, AckHandlerWrapper>> queue_;
    
     public:
      virtual bool Dequeue(pubsub::Message& item, AckHandlerWrapper& receipt) {
        std::unique_lock<std::mutex> lk(mu_);
        if (queue_.empty()) return false;
        auto& f = queue_.front();
        item = std::move(f.first);
        receipt = std::move(.second);
        queue_.pop_front();
        return true;
      }
      virtual void Discard(AckHandlerWrapper const& receipt) {
        std::move(receipt.ack_handler).ack();
      }
      void Push(pubsub::Message m, pubsub::AckHandler h) {
        std::unique_lock<std::mutex> lk(mu_);
        queue_.push_back(
          std::make_pair(std::move(m), AckHandlerWrapper(std::move(h)));
      }
    };
    
    
    std::shared_ptr<PubsubBuffer> F(pubsub::Subscriber s) {
      auto buffer = std::make_shared<PubsubBuffer>();
      auto handler = [buffer](pubsub::Message m, pubsbu::AckHandler h) {
        buffer.Push(std::move(m), std::move(h));
      };
      return buffer;
    }
    

    【讨论】:

    • 嗨@coryan。感谢您的回答。我已经更新了问题以避免不必要的细节。在您的示例中,发布者和订阅者应该在同一进程中。在我的例子中,发布者是我们发布任务的客户。让我们假设 hash 函数和 operator== 对于 TItem 是重载的,我们可以将一个 unordered_map 从 TItem 保留到它的 AckHandler 以便我们可以按需找到它。我只需要出列一条消息,对其进行处理,然后在成功时将其删除。
    • "在你的例子中,发布者和订阅者应该在同一个进程中。"我不这么认为。我只是假设你会从某个订阅者的处理程序中提供这个队列。您能否在(更新的)问题中分享更多关于Queue&lt;TItem&gt; 接口的语义?是否期望每个 Dequeue() 导致 RPC 以从 Cloud Pub/Sub 获取消息?或者你想在你的本地进程中有一个准备好的消息缓冲区?
    • 是的,每个 Dequeue 都会产生一个 RPC。消息的处理时间从几秒到大约 5 分钟不等,因此我们将隐身超时设置为 600 秒。
    • 主线程中的代码看起来像while( !queue-&gt;Empty() ) { if(queue-&gt;Dequeue(item)) { /*Process the message*/ queue-&gt;Discard(item); } } 另一个问题是如何在不使用不推荐使用的参数的情况下确定队列是否为空,但这是一个不同的问题。
    • 您似乎想一次提取一条消息,而将任何其他消息留在 Pub/Sub 服务中?我们不提供该 API,因为通常它会产生非常差的吞吐量,但有一种方法可以做类似的事情。在 SubscriberOption: googleapis.dev/cpp/google-cloud-pubsub/latest/… 中将 max_outstanding_messages() 设置为 1。该服务将在发送下一条消息之前等待确认。
    猜你喜欢
    • 2023-04-02
    • 2019-04-10
    • 2017-10-15
    • 2020-02-05
    • 1970-01-01
    • 2019-04-10
    • 2018-09-13
    • 2016-01-28
    • 2021-12-12
    相关资源
    最近更新 更多