PubSubInboundChannelAdapter 基于对主题的订阅。所以,它将是一个消息流,这个PubSubInboundChannelAdapter 对它们中的每一个做出反应,转换为 Spring 消息并将其发送到下游到配置的通道。
订阅的时候真的没办法拿到一批消息。
您还需要记住,在 GCP Pub/Sub 中没有像 offset 这样的东西。您确实应该确认您从 Pub/Sub 消费的每条消息。
虽然有办法一次提取一批消息,使用PubSubMessageSource。 messageSource.setMaxFetchSize(5); 可以解决问题,但是 PubSubMessageSource 仍然会单独生成每条消息,因此您可以独立 (n) 确认它们。
当然,您可以利用 PubSubMessageSource 使用的功能 - PubSubSubscriberOperations.pullAndConvert()。有关更多信息,请参阅它的 JavaDocs:
/**
* Pull a number of messages from a Google Cloud Pub/Sub subscription and convert them to Spring messages with
* the desired payload type.
* @param subscription the subscription name
* @param maxMessages the maximum number of pulled messages
* @param returnImmediately returns immediately even if subscription doesn't contain enough
* messages to satisfy {@code maxMessages}
* @param payloadType the type to which the payload of the Pub/Sub messages should be converted
* @param <T> the type of the payload
* @return the list of received acknowledgeable messages
* @since 1.1
*/
<T> List<ConvertedAcknowledgeablePubsubMessage<T>> pullAndConvert(String subscription, Integer maxMessages,
Boolean returnImmediately, Class<T> payloadType);
所以,这个看起来像您正在寻找的,因为您确实将拥有一个消息列表,并且每个消息都是带有 (n)ack 回调的包装器。
此 API 可用于自定义 @InboundChannelAdapter MessageSource 或 Supplier @Bean 实现。
但仍然:我看不到整个批处理的好处,因为每条消息都可以单独确认,而不会影响所有其他消息。