【发布时间】:2016-12-05 20:34:34
【问题描述】:
我确实创建了如下聚合服务
@EnableBinding(Processor.class)
class Configuration {
@Autowired
Processor processor;
@ServiceActivator(inputChannel = Processor.INPUT)
@Bean
public MessageHandler aggregator() {
AggregatingMessageHandler aggregatingMessageHandler =
new AggregatingMessageHandler(new DefaultAggregatingMessageGroupProcessor(),
new SimpleMessageStore(10));
//AggregatorFactoryBean aggregatorFactoryBean = new AggregatorFactoryBean();
//aggregatorFactoryBean.setMessageStore();
aggregatingMessageHandler.setOutputChannel(processor.output());
//aggregatorFactoryBean.setDiscardChannel(processor.output());
aggregatingMessageHandler.setSendPartialResultOnExpiry(true);
aggregatingMessageHandler.setSendTimeout(1000L);
aggregatingMessageHandler.setCorrelationStrategy(new ExpressionEvaluatingCorrelationStrategy("requestType"));
aggregatingMessageHandler.setReleaseStrategy(new MessageCountReleaseStrategy(3)); //ExpressionEvaluatingReleaseStrategy("size() == 5")
aggregatingMessageHandler.setExpireGroupsUponCompletion(true);
aggregatingMessageHandler.setGroupTimeoutExpression(new ValueExpression<>(3000L)); //size() ge 2 ? 5000 : -1
aggregatingMessageHandler.setExpireGroupsUponTimeout(true);
return aggregatingMessageHandler;
}
}
现在我想在创建新组后立即释放该组,所以我一次只有一个组。
更具体地说,我确实收到了两种类型的请求 'PUT' 和 'DEL' 。我想按照上述规则继续聚合,但是一旦我收到一个请求类型而不是我正在聚合的类型,我想释放当前组并开始聚合新类型。
我想这样做的原因是因为这些请求被发送到不支持同时具有 PUT 和 DEL 请求的另一方,并且我不能延迟任何 DEL 请求,因为 PUT 和 DEL 之间的顺序很重要.
我知道我需要创建一个自定义发布 Pojo,但我可以检查当前组吗?
举例
如果我收到如下 6 条消息
PUT PUT PUT DEL DEL PUT
它们应该汇总如下
3PUT
2 删除
1 个放置
【问题讨论】:
-
抱歉,您的问题不清楚。将
CorrelationStrategy用作FOO文字,您始终会得到同一个组,并且所有消息仅聚合到该组。而已。完全不清楚围绕“一旦创建新组就释放组”应该采取什么行动。 -
@ArtemBilan 很抱歉这个错误,我确实更新了这个问题,希望现在很清楚
标签: spring-integration spring-cloud-stream