【问题标题】:Spring integration multithreadingSpring集成多线程
【发布时间】:2018-08-24 04:05:05
【问题描述】:

参考我之前在 URL 上的问题 - Spring integration multithreading requirement - 我想我可能已经找到了问题的根本原因。
我的要求简介 -
在 1 秒的固定延迟后轮询数据库,然后将非常有限的数据发布到 Tibco EMS 队列。现在,我必须从这个 EMS 队列中以多线程方式执行以下任务:-
i) 使用消息,
ii) 现在从数据库中获取完整数据,
iii) 将这些数据转换为json格式。

我的设计 -

`<int:channel id="dbchannel"/>   
    <int-jdbc:inbound-channel-adapter id="dbchanneladapter"  
        channel="dbchannel"  data-source="datasource"  
        query="${selectquery}"  update="${updatequery}"  
        max-rows-per-poll="1000">  
        <int:poller id="dbchanneladapterpoller"  
            fixed-delay="1000">  
            <int:transactional transaction-manager="transactionmanager" />  
        </int:poller>  
    </int-jdbc:inbound-channel-adapter>  
    <int:service-activator input-channel="dbchannel"
        output-channel="publishchannel" ref="jdbcmessagehandler" method="handleJdbcMessage" />  
    <bean id="jdbcmessagehandler" class="com.citigroup.handler.JdbcMessageHandler" />  

    <int:publish-subscribe-channel id="publishchannel"/>  
    <int-jms:outbound-channel-adapter id="publishchanneladapter"
        channel="publishchannel" jms-template="publishrealtimefeedinternaljmstemplate" />  

    <int:channel id="subscribechannel"/>  
    <int-jms:message-driven-channel-adapter
        id="subscribechanneladapter" destination="subscriberealtimeinternalqueue" 
        connection-factory="authenticationconnectionfactory" channel="subscribechannel" 
        concurrent-consumers="5" max-concurrent-consumers="5" />  
    <int:service-activator input-channel="subscribechannel"
        ref="subscribemessagehandler" method="logJMSMessage" />  
    <bean id="subscribemessagehandler" class="com.citigroup.handler.SubscribeJMSMessageHandler" />  
</beans>  

<bean id="authenticationconnectionfactory"
        class="org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapter">
        <property name="targetConnectionFactory" ref="connectionFactory" />
        <property name="username" value="test" />
        <property name="password" value="test123" />
    </bean>  

<bean id="connectionFactory" class="org.springframework.jndi.JndiObjectFactoryBean">
    <property name="jndiTemplate">
        <ref bean="jndiTemplate" />
    </property>
    <property name="jndiName" value="app.jndi.testCF" />
</bean>  

<bean id="subscriberealtimeinternalqueue" class="org.springframework.jndi.JndiObjectFactoryBean">
    <property name="jndiTemplate">
        <ref bean="jndiTemplate" />
    </property>
    <property name="jndiName"
        value="app.queue.testQueue" />
</bean>   
<bean id="jndiTemplate" class="org.springframework.jndi.JndiTemplate">
    <property name="environment">
        <props>
            <prop key="java.naming.factory.initial">com.tibco.tibjms.naming.TibjmsInitialContextFactory
            </prop>
            <prop key="java.naming.provider.url">tibjmsnaming://test01d.nam.nsroot.net:7222</prop>
        </props>
    </property>
</bean>`


问题 -
使用消息驱动通道,并发消费者值设置为 5。但是,看起来只创建了一个消费者线程 (container-2) 并从 EMS 队列中获取消息。请在 log4j 日志下方找到 -

2018 年 8 月 16 日 11:31:12,077 INFO SubscribeJMSMessageHandler [subscribechanneladapter.container-2][]:
此时从 Queue 读取的记录总数为 387
记录#1:: [ID=7694066395]
记录#2:: [ID=7694066423]
.. .. ..
记录#387:: [ID=6147457333]
>

这里可能的根本原因 -
可能是我在导致此多线程问题的固定延迟后轮询数据库以获取数据的配置中的第一步。参考上面的日志,我的假设是因为获取的记录数是 387 并且所有这些都捆绑到一个 List 对象(List> 消息)中,它被认为只是 1 条消息/有效负载而不是 387 条不同的消息,那就是为什么只有一个线程/容器/消费者正在接收这个捆绑的消息。这种假设的原因是下面的日志 -

GenericMessage [payload=[{"ID":7694066395},{"ID":7694066423},{"ID":6147457333}],
headers={json__ContentTypeId__=class org .springframework.util.LinkedCaseInsensitiveMap, jms_redelivered=false, json__TypeId__=class java.util.ArrayList, jms_destination=Queue[app.queue.testQueue], id=e034ba73-7781-b62c-0307-170099263068, priority=4, jms_timestamp=1534820792064 , contentType=application/json, jms_messageId=ID:test.21415B667C051:40C149C0, timestamp=1534820792481}]


问题 -
我对根本原因的理解是否正确?如果是,那么如何将这 387 条消息视为单独的消息(而不是消息的一个 List 对象)并一一发布而不影响事务管理?
在我之前关于 stackoverflow 的帖子中,我曾与 https://stackoverflow.com/users/2756547/artem-bilan 讨论过这个问题,我不得不通过用 ActiveMQ 替换 Tibco EMS 来检查这个设计。但是,我们的架构团队仍在分析 ActiveMQ 基础架构,因此在获得批准之前不能使用。

【问题讨论】:

    标签: spring-integration spring-jms


    【解决方案1】:

    哦!现在我明白你的问题是什么了。 int-jdbc:inbound-channel-Adapter 确实返回了它可以从数据库中选择的记录列表。整个列表作为单个消息发送到 JMS。这就是你在消费者端只看到一个线程的原因:只有一条消息可以从队列中获取。

    如果您想为每条拉取的记录提供单独的消息,您需要考虑在 JDBC 轮询操作和发送到 JMS 之间使用 &lt;splitter&gt;

    【讨论】:

    • 如果我在 JDBC 轮询操作和发送到 EMS 队列之间使用&lt;splitter&gt;,希望不会破坏事务。我在设计中设置事务的方式就像 - 事务在第一次数据库轮询获取 10 条记录时开始,并在这些记录发送到 Tibco EMS 队列时结束。只有在它被发送到 EMS 队列之后,第二次轮询才应该开始,并随之开始第二次事务。请确认
    • 没错,事务确实是每个轮询的,并且是针对整个子流的,只要下游的所有事情都在同一个轮询线程中执行
    • 我对每个拉取的记录都使用了&lt;splitter/&gt;。我现在可以看到多个线程。谢谢。这里有一个问题 - 我正在使用&lt;logging-channel-adapter logger-name="feedlogger" level="INFO" log-full-message="true"/&gt; 使用 log4j 框架记录消息 - 有没有办法在日志文件中记录我的设计中使用的每个消息通道的“id”?
    • 让我们把它安排为一个单独的 SO 问题!
    • 我已将其安排为单独的 SO 问题 - stackoverflow.com/questions/52038843/…
    猜你喜欢
    • 2017-07-30
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2023-03-15
    • 1970-01-01
    • 2013-06-01
    • 1970-01-01
    相关资源
    最近更新 更多