【问题标题】:Consuming batch of message from pubsub with Spring使用 Spring 从 pubsub 消费一批消息
【发布时间】:2019-08-30 10:16:52
【问题描述】:

如何消费来自 pubsub 的多条消息?这似乎是一个简单的问题,应该有简单的解决方案,但目前我可以找到简单的方法来使用 spring-cloud-gcp-pubsub 从 pubsub 消费批量记录。

我正在使用spring-cloud-gcp-pubsub 来使用来自 pubsub 的消息并在 Spring Boot 应用程序中处理它们。我当前的设置非常简单,我有消耗记录的 PubSubInboundChannelAdapterServiceActivator。经过研究,我发现了 spring 集成Aggregators,但它们似乎不是这样做的好方法,因为向下游传播确认并不容易。有什么我想念的吗?如何批量消费消息?

【问题讨论】:

    标签: spring-integration spring-cloud google-cloud-pubsub


    【解决方案1】:

    PubSubInboundChannelAdapter 基于对主题的订阅。所以,它将是一个消息流,这个PubSubInboundChannelAdapter 对它们中的每一个做出反应,转换为 Spring 消息并将其发送到下游到配置的通道。 订阅的时候真的没办法拿到一批消息。

    您还需要记住,在 GCP Pub/Sub 中没有像 offset 这样的东西。您确实应该确认您从 Pub/Sub 消费的每条消息。

    虽然有办法一次提取一批消息,使用PubSubMessageSourcemessageSource.setMaxFetchSize(5); 可以解决问题,但是 PubSubMessageSource 仍然会单独生成每条消息,因此您可以独立 (n) 确认它们。

    当然,您可以利用 PubSubMessageSource 使用的功能 - PubSubSubscriberOperations.pullAndConvert()。有关更多信息,请参阅它的 JavaDocs:

    /**
     * Pull a number of messages from a Google Cloud Pub/Sub subscription and convert them to Spring messages with
     * the desired payload type.
     * @param subscription the subscription name
     * @param maxMessages the maximum number of pulled messages
     * @param returnImmediately returns immediately even if subscription doesn't contain enough
     * messages to satisfy {@code maxMessages}
     * @param payloadType the type to which the payload of the Pub/Sub messages should be converted
     * @param <T> the type of the payload
     * @return the list of received acknowledgeable messages
     * @since 1.1
     */
    <T> List<ConvertedAcknowledgeablePubsubMessage<T>> pullAndConvert(String subscription, Integer maxMessages,
            Boolean returnImmediately, Class<T> payloadType);
    

    所以,这个看起来像您正在寻找的,因为您确实将拥有一个消息列表,并且每个消息都是带有 (n)ack 回调的包装器。

    此 API 可用于自定义 @InboundChannelAdapter MessageSourceSupplier @Bean 实现。

    但仍然:我看不到整个批处理的好处,因为每条消息都可以单独确认,而不会影响所有其他消息。

    【讨论】:

      【解决方案2】:

      试试下面的:

      @Bean
          @InboundChannelAdapter(channel = "pubsubInputChannel", poller = @Poller(fixedDelay = "5000", maxMessagesPerPoll = "3"))
          public MessageSource<Object> pubsubAdapter(PubSubTemplate pubSubTemplate) {
              PubSubMessageSource messageSource = new PubSubMessageSource(pubSubTemplate, "testSubscription");
              messageSource.setAckMode(AckMode.MANUAL);
              return messageSource;
          }
      

      maxMessagesPerPoll 属性确定将轮询多少消息。

      【讨论】:

        猜你喜欢
        • 2021-06-23
        • 2016-12-15
        • 2022-06-15
        • 1970-01-01
        • 1970-01-01
        • 2016-06-14
        • 1970-01-01
        • 2020-08-11
        • 1970-01-01
        相关资源
        最近更新 更多