【问题标题】:Scatter Gather with parallel flow (Timeout in aggregator)Scatter Gather 并行流(聚合器超时)
【发布时间】:2021-11-26 05:34:45
【问题描述】:

我一直在尝试在集合中添加timeout,以免等待每个流程都完成。 但是当我添加超时不起作用时,因为聚合器等待每个流完成。

@Bean
public IntegrationFlow queueFlow(LogicService service) {    
        return f -> f.scatterGather(scatterer -> scatterer
                                .applySequence(true)
                                .recipientFlow(aFlow(service))
                                .recipientFlow(bFlow(service))
                        , aggregatorSpec -> aggregatorSpec.groupTimeout(2000L)) 

例如,我的流程中,其中一个延迟 2 秒,另一个延迟 4 秒

public IntegrationFlow bFlow(LogicService service) {
        return IntegrationFlows.from(MessageChannels.executor(Executors.newCachedThreadPool()))
                .handle(service::callFakeServiceTimeout2)
                .transform((MessageDomain.class), message -> {
                    message.setMessage(message.getMessage().toUpperCase());
                    return message;
                }).get();
    } 

我使用 Executors.newCachedThreadPool() 来并行运行。 我想释放在超时完成之前包含的每条消息

我一直在测试的另一种方法是使用默认收集器并在 scatterGather 中设置gatherTimeout,但我不知道我是否遗漏了什么 Approach gatherTimeout

更新

cmets 中给出的所有方法都经过测试并且工作正常,唯一的问题是每个操作都是在创建消息组时评估的。并且在第一条消息到达之前创建消息组。理想的方法是在 scatterer 分发请求消息时选择有效。

我的临时解决方案是使用临时发布策略,应用 GroupConditionProvider 读取我在通过网关发送消息时创建的自定义标头。唯一需要担心的是,只有在收到新消息或我设置了组超时时才会执行发布策略。

【问题讨论】:

    标签: spring spring-boot spring-integration spring-integration-dsl


    【解决方案1】:

    聚合器上的groupTimeout 不足以释放组。如果您没有在该超时时间内获得整个组,那么它将被丢弃。见sendPartialResultOnExpiry 选项:https://docs.spring.io/spring-integration/reference/html/message-routing.html#agg-and-group-to

    如果send-partial-result-on-expirytrue,(部分)MessageGroup 中的现有消息将作为普通聚合器回复消息发布到output-channel。否则,它被丢弃。

    如果您完全不希望收集者回复,gatherTimeout 是很好的选择。所以,这样你就不会永远阻塞分散收集线程:https://docs.spring.io/spring-integration/reference/html/message-routing.html#scatter-gather-error-handling

    【讨论】:

    • 但是当我选择这个选项时,我没有收到来自集合的任何消息。我想释放集合中包含的所有消息,直到超时结束
    • 您确定您的服务会在这 2 秒内应答吗?艾米,你需要再等一会儿吗?
    • 我的目标是释放那一刻的消息并丢弃其他消息。所以我有两个流,一个需要 2 秒,一个需要 4 秒。我的超时可能是 3 秒,所以我尝试返回 2 秒流的一条消息并丢弃需要 4 秒的流的消息
    • 但是我在你的配置中没有看到groupTimeout()gatherTimeout() 在这里没有帮助:聚合器仍在等待收集所有消息。再说一遍:测试 2 秒延迟并等待 2 秒并不完美。由于 CPU 资源繁忙,实际延迟可能会更长。
    猜你喜欢
    • 2020-05-07
    • 2022-01-16
    • 1970-01-01
    • 1970-01-01
    • 2016-07-14
    • 2019-01-13
    • 1970-01-01
    • 1970-01-01
    • 2020-05-03
    相关资源
    最近更新 更多