【发布时间】:2018-08-24 04:05:05
【问题描述】:
参考我之前在 URL 上的问题 - Spring integration multithreading requirement - 我想我可能已经找到了问题的根本原因。
我的要求简介 -
在 1 秒的固定延迟后轮询数据库,然后将非常有限的数据发布到 Tibco EMS 队列。现在,我必须从这个 EMS 队列中以多线程方式执行以下任务:-
i) 使用消息,
ii) 现在从数据库中获取完整数据,
iii) 将这些数据转换为json格式。
我的设计 -
`<int:channel id="dbchannel"/>
<int-jdbc:inbound-channel-adapter id="dbchanneladapter"
channel="dbchannel" data-source="datasource"
query="${selectquery}" update="${updatequery}"
max-rows-per-poll="1000">
<int:poller id="dbchanneladapterpoller"
fixed-delay="1000">
<int:transactional transaction-manager="transactionmanager" />
</int:poller>
</int-jdbc:inbound-channel-adapter>
<int:service-activator input-channel="dbchannel"
output-channel="publishchannel" ref="jdbcmessagehandler" method="handleJdbcMessage" />
<bean id="jdbcmessagehandler" class="com.citigroup.handler.JdbcMessageHandler" />
<int:publish-subscribe-channel id="publishchannel"/>
<int-jms:outbound-channel-adapter id="publishchanneladapter"
channel="publishchannel" jms-template="publishrealtimefeedinternaljmstemplate" />
<int:channel id="subscribechannel"/>
<int-jms:message-driven-channel-adapter
id="subscribechanneladapter" destination="subscriberealtimeinternalqueue"
connection-factory="authenticationconnectionfactory" channel="subscribechannel"
concurrent-consumers="5" max-concurrent-consumers="5" />
<int:service-activator input-channel="subscribechannel"
ref="subscribemessagehandler" method="logJMSMessage" />
<bean id="subscribemessagehandler" class="com.citigroup.handler.SubscribeJMSMessageHandler" />
</beans>
<bean id="authenticationconnectionfactory"
class="org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapter">
<property name="targetConnectionFactory" ref="connectionFactory" />
<property name="username" value="test" />
<property name="password" value="test123" />
</bean>
<bean id="connectionFactory" class="org.springframework.jndi.JndiObjectFactoryBean">
<property name="jndiTemplate">
<ref bean="jndiTemplate" />
</property>
<property name="jndiName" value="app.jndi.testCF" />
</bean>
<bean id="subscriberealtimeinternalqueue" class="org.springframework.jndi.JndiObjectFactoryBean">
<property name="jndiTemplate">
<ref bean="jndiTemplate" />
</property>
<property name="jndiName"
value="app.queue.testQueue" />
</bean>
<bean id="jndiTemplate" class="org.springframework.jndi.JndiTemplate">
<property name="environment">
<props>
<prop key="java.naming.factory.initial">com.tibco.tibjms.naming.TibjmsInitialContextFactory
</prop>
<prop key="java.naming.provider.url">tibjmsnaming://test01d.nam.nsroot.net:7222</prop>
</props>
</property>
</bean>`
问题 -
使用消息驱动通道,并发消费者值设置为 5。但是,看起来只创建了一个消费者线程 (container-2) 并从 EMS 队列中获取消息。请在 log4j 日志下方找到 -
2018 年 8 月 16 日 11:31:12,077 INFO SubscribeJMSMessageHandler [subscribechanneladapter.container-2][]:
此时从 Queue 读取的记录总数为 387
记录#1:: [ID=7694066395]
记录#2:: [ID=7694066423]
.. .. ..
记录#387:: [ID=6147457333]
>
这里可能的根本原因 -
可能是我在导致此多线程问题的固定延迟后轮询数据库以获取数据的配置中的第一步。参考上面的日志,我的假设是因为获取的记录数是 387 并且所有这些都捆绑到一个 List 对象(List> 消息)中,它被认为只是 1 条消息/有效负载而不是 387 条不同的消息,那就是为什么只有一个线程/容器/消费者正在接收这个捆绑的消息。这种假设的原因是下面的日志 -
GenericMessage [payload=[{"ID":7694066395},{"ID":7694066423},{"ID":6147457333}],
headers={json__ContentTypeId__=class org .springframework.util.LinkedCaseInsensitiveMap, jms_redelivered=false, json__TypeId__=class java.util.ArrayList, jms_destination=Queue[app.queue.testQueue], id=e034ba73-7781-b62c-0307-170099263068, priority=4, jms_timestamp=1534820792064 , contentType=application/json, jms_messageId=ID:test.21415B667C051:40C149C0, timestamp=1534820792481}]
问题 -
我对根本原因的理解是否正确?如果是,那么如何将这 387 条消息视为单独的消息(而不是消息的一个 List 对象)并一一发布而不影响事务管理?
在我之前关于 stackoverflow 的帖子中,我曾与 https://stackoverflow.com/users/2756547/artem-bilan 讨论过这个问题,我不得不通过用 ActiveMQ 替换 Tibco EMS 来检查这个设计。但是,我们的架构团队仍在分析 ActiveMQ 基础架构,因此在获得批准之前不能使用。
【问题讨论】:
标签: spring-integration spring-jms