【发布时间】:2018-05-29 04:17:03
【问题描述】:
我有一个 spring 集成流,它产生的消息应该保留在等待适当的消费者出现并使用它们。
@Bean
public IntegrationFlow messagesPerCustomerFlow() {
return IntegrationFlows.
from(WebFlux.inboundChannelAdapter("/messages/{customer}")
.requestMapping(r -> r
.methods(HttpMethod.POST)
)
.requestPayloadType(JsonNode.class)
.headerExpression("customer", "#pathVariables.customer")
)
.channel(messagesPerCustomerQueue())
.get();
}
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerSpec poller() {
return Pollers.fixedRate(100);
}
@Bean
public QueueChannel messagesPerCustomerQueue() {
return MessageChannels.queue()
.get();
}
队列中的消息应作为服务器发送的事件通过 http 传递,如下所示。
PublisherSubscription 只是 Publisher 和 IntegrationFlowRegistration 的持有者,后者用于在不再需要时销毁动态创建的流(注意 GET 的传入消息没有内容,处理不当ATM 由 Webflux 集成,因此需要一个小的解决方法来访问推到 customer 标头的路径变量):
@Bean
public IntegrationFlow eventMessagesPerCustomer() {
return IntegrationFlows
.from(WebFlux.inboundGateway("/events/{customer}")
.requestMapping(m -> m.produces(TEXT_EVENT_STREAM_VALUE))
.headerExpression("customer", "#pathVariables.customer")
.payloadExpression("''") // neeeded to make handle((p,h) work
)
.log()
.handle((p, h) -> {
String customer = h.get("customer").toString();
PublisherSubscription<JsonNode> publisherSubscription =
subscribeToMessagesPerCustomer(customer);
return Flux.from(publisherSubscription.getPublisher())
.map(Message::getPayload)
.doFinally(signalType ->
publisherSubscription.unsubscribe());
})
.get();
}
上述对服务器发送事件的请求动态注册了一个流,该流使用selective consumer 按需订阅队列通道,由throwExceptionOnRejection(true) 的过滤器实现。遵循Message Handler chain 的规范,应确保将消息提供给所有消费者,直到有人接受为止。
public PublisherSubscription<JsonNode> subscribeToMessagesPerCustomer(String customer) {
IntegrationFlowBuilder flow = IntegrationFlows.from(messagesPerCustomerQueue())
.filter("headers.customer=='" + customer + "'",
filterEndpointSpec -> filterEndpointSpec.throwExceptionOnRejection(true));
Publisher<Message<JsonNode>> messagePublisher = flow.toReactivePublisher();
IntegrationFlowRegistration registration = integrationFlowContext.registration(flow.get())
.register();
return new PublisherSubscription<>(messagePublisher, registration);
}
这种结构原则上是可行的,但存在以下问题:
- 在没有订阅者的情况下发送到队列的消息导致
MessageDeliveryException: Dispatcher has no subscribers for channel 'application.messagesPerCustomerQueue' - 当没有匹配的订阅者存在时发送到队列的消息导致
AggregateMessageDeliveryException: All attempts to deliver Message to MessageHandlers failed。
我想要的是消息保留在队列中并重复提供给所有订阅者,直到它被消耗或过期(适当的选择性消费者)。我该怎么做?
【问题讨论】:
标签: spring-integration spring-integration-dsl