【问题标题】:spring-integration: dispatch queued messages to selective consumerspring-integration:将排队的消息发送给选择性消费者
【发布时间】:2018-05-29 04:17:03
【问题描述】:

我有一个 spring 集成流,它产生的消息应该保留在等待适当的消费者出现并使用它们。

@Bean
public IntegrationFlow messagesPerCustomerFlow() {
    return IntegrationFlows.
            from(WebFlux.inboundChannelAdapter("/messages/{customer}")
                    .requestMapping(r -> r
                            .methods(HttpMethod.POST)
                    )
                    .requestPayloadType(JsonNode.class)
                    .headerExpression("customer", "#pathVariables.customer")
            )
            .channel(messagesPerCustomerQueue()) 
            .get();
}

@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerSpec poller() {
    return Pollers.fixedRate(100);
}

@Bean
public QueueChannel messagesPerCustomerQueue() {
    return MessageChannels.queue()
            .get();
}

队列中的消息应作为服务器发送的事件通过 http 传递,如下所示。

PublisherSubscription 只是 Publisher 和 IntegrationFlowRegistration 的持有者,后者用于在不再需要时销毁动态创建的流(注意 GET 的传入消息没有内容,处理不当ATM 由 Webflux 集成,因此需要一个小的解决方法来访问推到 customer 标头的路径变量):

@Bean
public IntegrationFlow eventMessagesPerCustomer() {
    return IntegrationFlows
       .from(WebFlux.inboundGateway("/events/{customer}")
            .requestMapping(m -> m.produces(TEXT_EVENT_STREAM_VALUE))
            .headerExpression("customer", "#pathVariables.customer")
            .payloadExpression("''") // neeeded to make handle((p,h) work
       )
       .log()
       .handle((p, h) -> {
           String customer = h.get("customer").toString();
           PublisherSubscription<JsonNode> publisherSubscription =
               subscribeToMessagesPerCustomer(customer);
           return Flux.from(publisherSubscription.getPublisher())
                   .map(Message::getPayload)
                   .doFinally(signalType ->
                      publisherSubscription.unsubscribe());
       })
       .get();
}

上述对服务器发送事件的请求动态注册了一个流,该流使用selective consumer 按需订阅队列通道,由throwExceptionOnRejection(true) 的过滤器实现。遵循Message Handler chain 的规范,应确保将消息提供给所有消费者,直到有人接受为止。

public PublisherSubscription<JsonNode> subscribeToMessagesPerCustomer(String customer) {
    IntegrationFlowBuilder flow = IntegrationFlows.from(messagesPerCustomerQueue())
            .filter("headers.customer=='" + customer + "'",
                    filterEndpointSpec -> filterEndpointSpec.throwExceptionOnRejection(true));
    Publisher<Message<JsonNode>> messagePublisher = flow.toReactivePublisher();

    IntegrationFlowRegistration registration = integrationFlowContext.registration(flow.get())
            .register();

    return new PublisherSubscription<>(messagePublisher, registration);
}

这种结构原则上是可行的,但存在以下问题:

  • 在没有订阅者的情况下发送到队列的消息导致MessageDeliveryException: Dispatcher has no subscribers for channel 'application.messagesPerCustomerQueue'
  • 当没有匹配的订阅者存在时发送到队列的消息导致AggregateMessageDeliveryException: All attempts to deliver Message to MessageHandlers failed

我想要的是消息保留在队列中并重复提供给所有订阅者,直到它被消耗或过期(适当的选择性消费者)。我该怎么做?

【问题讨论】:

    标签: spring-integration spring-integration-dsl


    【解决方案1】:

    请注意,GET 的传入消息没有内容,Webflux 集成未正确处理 ATM

    我不明白这个问题。

    WebFluxInboundEndpoint 使用此算法:

    if (isReadable(request)) {
       ...
    else {
        return (Mono<T>) Mono.just(exchange.getRequest().getQueryParams());
    }
    

    GET 方法真正转到 else 分支的地方。而要发送的消息的payloadMultiValueMap。此外,我们最近还为您解决了 POST 的问题,该问题已经在 5.0.5 版本中发布:https://jira.spring.io/browse/INT-4462

    Dispatcher 没有订阅者

    原则上不会在QueueChannel 上发生。那里根本没有任何调度员。它只是队列和发送者提供消息被存储。您还缺少与我们分享的其他内容。但是让我们用自己的名字来称呼事物:messagesPerCustomerQueue 在您的应用程序中不是QueueChannel

    更新

    关于:

    我想要的是消息保留在队列中并重复提供给所有订阅者,直到它被消耗或过期(适当的选择性消费者)

    我们看到的只是一个基于嵌入式 ActiveMQ 的 PollableJmsChannel,以支持消息的 TTL。作为此队列的消费者,您应该有一个 PublishSubscribeChannelsetMinSubscribers(1) 以使 MessagingTemplate 在还没有订阅者时抛出 MessageDeliveryException。这样,JMS 事务将被回滚,并且消息将返回到队列以进行下一个轮询周期。

    内存中QueueChannel 的问题是没有事务重新传递,并且一旦从该队列轮询消息就会丢失。

    另一个类似于 JMS(事务)的选项是 JdbcChannelMessageStore 用于 QueueChannel。虽然这种方式我们没有 TTL 功能...

    【讨论】:

    • 如你所说,GET 消息的有效负载变成了一个空的多值映射。但这有一个有趣的效果:在handle()handle()LambdaMessageProcessor 的第103/104 行中,有效负载而不是标头被复制到args[1],因此p 和h 都包含空映射。我将打开一个 Jira 问题以进一步讨论。关于队列:它实际上是一个带有轮询器的队列通道,我已经添加了上面的 bean 配置。我可以将样本提交到github.com/spring-projects/spring-integration-samples/pull/225
    • 哦!这很有趣……是的,请就此事提出 JIRA。
    • 请看一下我在答案中的更新。
    • 感谢 JIRA!稍后会看看。请注意我对您原始问题的回答中的更新
    猜你喜欢
    • 2023-04-07
    • 1970-01-01
    • 1970-01-01
    • 2023-03-29
    • 1970-01-01
    • 2016-11-24
    • 2013-08-01
    • 2014-07-03
    • 2014-01-22
    相关资源
    最近更新 更多