【问题标题】:GCP PubSub Spring Boot repeat extract messageGCP PubSub Spring Boot 重复提取消息
【发布时间】:2019-10-25 08:47:35
【问题描述】:

我需要有关 gcp pub/sus 问题的帮助。我有一个进程向 pubsub 发送 100 条带有过滤器的消息,另一个应用程序(在 spring boot 中)接收这些消息。当 Spring Boot 应用程序从 pubsub 接收消息(不是 pull)时,处理 100 条消息,但进入该进程,接收更多消息,在不同时间接收不同数量的消息,任何时间接收 120,另外 140,其他超过 200。我没有找到任何解决方案,这是我的代码:

    @Bean
    @ServiceActivator(inputChannel = "pubsubInputChannel")
    public MessageHandler messageReceiver() {
        return message -> {
            System.out.println("Message arrived! Payload: " + new String((byte[]) message.getPayload()));
            //other process of app (call other api)
            AckReplyConsumer consumer = (AckReplyConsumer) message.getHeaders().get(GcpPubSubHeaders.ACKNOWLEDGEMENT);
            consumer.ack();
        };
    }

请帮帮我!!!

【问题讨论】:

标签: spring-boot google-cloud-platform google-cloud-pubsub


【解决方案1】:

在 Google Cloud Pub/Sub 中,由于不同的原因可能会出现重复消息。需要记住的一件事是,Cloud Pub/Sub 提供至少一次交付,这意味着始终可能出现一定数量的重复,因此您的应用程序必须能够适应它们。不过,这么多重复似乎有点高。一般来说,重复通常可能由于以下原因而发生:

  1. 发布者多次发送消息。如果发布者与 Cloud Pub/Sub 断开连接并再次发送相同的消息,则可能会发生这种情况。如果发生这种类型的重复,则消息将具有不同的消息 ID。
  2. 订阅者确认消息的时间过长。在您的代码中,您有//other process of app (call other api)。这个过程需要多长时间?如果超过确认消息的截止日期,则消息将被重新传递。请记住,如果此其他进程需要为所有消息获取锁,则可能会出现争用问题,即尝试同时获取这些锁的请求过多,从而导致处理延迟。默认情况下,消息的确认期限为十秒。使用 Java 客户端库时,截止日期由 maxAckExtensionPeriod 自动延长,默认为一小时。这个属性也可以在 Spring 的 DefaultSubscriberFactory 中设置。
  3. 消息根本不被确认。如果异常阻止对ack 的调用,或者出现死锁导致该行代码永远无法到达,则将重新传递消息。
  4. 用例是large backlog of small messages 之一。在这种情况下,缓冲区很容易以导致重新传递消息的方式填满客户端。

【讨论】:

  • 嗨,Kamal,确认消息是通过 .ack() 方法,还是在接收该消息的方法结束时确认??,grettings
猜你喜欢
  • 2017-10-23
  • 2018-05-20
  • 2021-04-14
  • 2021-12-30
  • 2023-03-29
  • 1970-01-01
  • 2020-10-13
  • 2019-07-03
  • 1970-01-01
相关资源
最近更新 更多