【问题标题】:Spring Integration - Producer Queue capacity limitationsSpring Integration - 生产者队列容量限制
【发布时间】:2021-06-08 08:36:30
【问题描述】:

Spring 集成 - 生产者队列容量限制

我们正在使用带有 MessageChannelPartitionHandler 的远程分区将分区消息发送到 Queue(ActiveMQ) 以供工人挑选和处理。 该作业需要处理大量数据,许多分区消息正在发布到队列,并且来自replyChannnel 的响应聚合器因消息超时而失败,因为所有消息都无法在给定时间内处理。 我们还尝试通过使用队列容量 来限制发布到队列的消息,这导致服务器崩溃并由于保存所有这些分区消息的内存问题而生成堆转储 在内存中。

我们希望自己控制 StepExecution 拆分的创建,这样就不会出现内存问题。 示例案例是大约 4k 分区消息被发布到队列,整个作业大约需要 3 小时。

我们可以控制消息发布到 QueueChannel 吗?

<bean id="senExtractMemberMasterPartitionHandler"   class="org.springframework.batch.integration.partition.MessageChannelPartitionHandler">
        <property name="messagingOperations" ref="senExtractMemberMasterPartitionMsgTemplate" />
        <property name="replyChannel" ref="senExtractProcessingMasterAggregatedChannel" />
        <property name="stepName" value="senExtractGeneratePrintRequestWorkerStep" />
        <property name="gridSize" value="500" />
</bean>
<bean id="senExtractMemberMasterPartitionMsgTemplate" class="org.springframework.integration.core.MessagingTemplate">
    <property name="defaultChannel" ref="senExtractProcessingMasterRequestChannel" />
    <property name="receiveTimeout" value="18000000" />
</bean>

<integration:channel id="senExtractProcessingMasterAggregatedChannel" >
    <integration:queue />
    <integration:interceptors>
        <integration:wire-tap channel="masterLoggingChannel" />
    </integration:interceptors>
</integration:channel>


<int-jms:outbound-gateway
    id="senExtractMasterOutGateway" 
    connection-factory="masterJMSConnectionFactory"
    correlation-key="JMSCorrelationID"
    request-channel="senExtractProcessingMasterRequestChannel"
    request-destination-name="senExtractRequestQueue" 
    reply-channel="senExtractProcessingMasterReplyChannel"
    reply-destination-name="senExtractReplyQueue" 
    async="true"
    auto-startup="true"
    reply-timeout="18000000" 
    receive-timeout="6000">
    <integration:poller ref="masterPoller"/>
    <int-jms:reply-listener />  
</int-jms:outbound-gateway>

【问题讨论】:

    标签: spring-batch spring-integration spring-messaging


    【解决方案1】:

    该作业需要处理大量数据,许多分区消息正在发布到队列,并且来自replyChannnel 的响应聚合器失败,消息超时,因为所有消息都无法在给定时间内处理。

    您需要增加超时时间或添加更多工作人员。 MessageChannelPartitionHandler 的 Javadoc 很清楚这一点:

    The receive timeout needs to be set realistically in the MessagingTemplate
    and the aggregator, so that there is a good chance of all work being done.
    

    我们想控制 StepExecution 拆分本身的创建

    Spring Batch 为此提供了StepExecutionSplitter 接口。如果默认的 (SimpleStepExecutionSplitter) 不适合您的需求,您可以为您的分区步骤提供自定义实现。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2012-01-28
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多