【问题标题】:Block main thread with multiple executor channel使用多个执行器通道阻塞主线程
【发布时间】:2021-04-13 16:46:44
【问题描述】:

我有一个多线程项目,我从队列中读取消息并将其传递给 spring 集成来处理它。每条消息通过 diff 通道最终存储在 DB 中并发送到下游 Q。

<int:channel id="mainChannel">
        <int:interceptors>
            <int:wire-tap channel="channel1"  selector-expression="!headers['somevalue']"/>
            <int:wire-tap channel="channel2"   selector-expression="!headers['somevalue']"/>
            <int:wire-tap channel="channel3"   selector-expression="!headers['somevalue']"/>
      
</int:interceptors>
    </int:channel> 
   <int:channel id="channel1">
        <int:dispatcher task-executor="exec1" />
   </int:channel>
  <int:channel id="channel2">
        <int:dispatcher task-executor="exec2" />
  </int:channel>
   <int:channel id="channel3">
        <int:dispatcher task-executor="exec3" />
  </int:channel>

现在我想阻塞发送者线程,直到所有异步任务都完成处理,例如,在线程中,只有一条消息进入所有通道(1,2,3),而对于另一个线程,它只会进入 2 个通道(1 ,2) 等等。

我见过聚合器/线程屏障方法,但不确定它是如何工作的,因为每个线程的流都是不同的。

我怎样才能做到这一点?

【问题讨论】:

    标签: java multithreading spring-integration


    【解决方案1】:

    线程无关紧要;主线程调用屏障,无论哪个线程将第三条消息添加到聚合器,都会释放组,然后可以使用该组来触发屏障。

    https://docs.spring.io/spring-integration/docs/current/reference/html/message-routing.html#barrier

    【讨论】:

    • 这是个问题,它不能保证所有 3 个通道都会一直执行,通道调用是基于每个线程的标头值。
    • 我不明白如何,因为所有 3 个选择器表达式都是相同的。如果他们真的可以进入可变数量的子流,您可以在聚合器中使用自定义发布策略来确定组何时完成。
    • 参见RecipientListRouter:它具有相同的selector 功能以及满足下游聚合器的applySequence 选项:docs.spring.io/spring-integration/reference/html/…
    • Artern,这个解决方案对我来说听起来不错,只是为了确认我是否需要汇总响应?如果是,我该怎么办?我找不到任何将 RecipientListRouter 与聚合器一起使用的示例。谢谢
    • 只是添加,我所有的频道都将是收件人列表中的执行者频道
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-07-21
    • 2014-05-25
    • 2018-02-16
    • 1970-01-01
    • 2015-12-29
    • 1970-01-01
    相关资源
    最近更新 更多