【问题标题】:Spring Integration | Connection pooling with JMS (ActiveMq)弹簧集成 |使用 JMS 的连接池 (ActiveMq)
【发布时间】:2017-01-07 14:00:49
【问题描述】:

我正在尝试将我的应用程序与 JMS 队列集成(使用 ActiveMQ)。 我使用 Spring Integration 作为集成组件。 我们想要连接池。 已将“maxConcurrentConsumers”作为 100 提供给“DefaultMessageListenerContainer”。

问题是,一旦从队列中读取了所有消息,“消费者数量”仍为 100(如 ActiveMq 控制台上所示)。 当我们在数据库中使用连接池时(通过 JNDI),一旦不再需要连接,它们就会返回到池中,并且打开的连接数会减少,这在此处不会发生。

任何处理此问题的指针都会有很大帮助。

我的代码如下:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:int-jms="http://www.springframework.org/schema/integration/jms"
xmlns:jms="http://www.springframework.org/schema/jms"
xmlns:int="http://www.springframework.org/schema/integration"
xsi:schemaLocation="http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd
                    http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
                    http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd
                    http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd
                    http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
                    http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">


<!-- Component scan to find all Spring components -->
<context:component-scan base-package="com.poc.springinteg._7" />

<!--  -->
<bean id="remoteJndiTemplate" class="org.springframework.jndi.JndiTemplate" lazy-init="false"> 
    <property name="environment"> 
        <props> 
            <prop key="java.naming.provider.url">tcp://localhost:61616</prop>
            <prop key="java.naming.factory.url.pkgs">org.apache.activemq.jndi</prop>
            <prop key="java.naming.factory.initial">org.apache.activemq.jndi.ActiveMQInitialContextFactory</prop>
            <prop key="connectionFactoryNames">DefaultActiveMQConnectionFactory,QueueConnectionFactory</prop>
            <prop key="queue.SendReceiveQueue">org.apache.geronimo.configs/activemq-ra/JCAAdminObject/SendReceiveQueue</prop>
            <prop key="queue.SendQueue">org.apache.geronimo.configs/activemq-ra/JCAAdminObject/MDBTransferBeanOutQueue</prop> 
        </props> 
    </property> 
</bean> 

<bean id="remoteConnectionFactory" class="org.springframework.jndi.JndiObjectFactoryBean" lazy-init="false">
    <property name="jndiTemplate" ref="remoteJndiTemplate"/>
    <property name="jndiName" value="QueueConnectionFactory"/>
    <property name="lookupOnStartup" value="true" />
    <property name="proxyInterface" value="javax.jms.ConnectionFactory" />
</bean>

   <!-- writing queue  -->
    <bean id="destinationqueue"        
class="org.apache.activemq.command.ActiveMQQueue">
       <constructor-arg index="0">
       <value>OutputQueue_7</value>
       </constructor-arg> 
   </bean>

<int:channel id="outbound"/>

<int-jms:outbound-channel-adapter id="jmsOut" 
                                 channel="outbound"
                                 connection-factory="remoteConnectionFactory"
                                 destination="destinationqueue" />

   <!-- reading queue  -->
   <bean id="sourceQueue" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg index="0">
            <value>OutputQueue_7</value>
        </constructor-arg> 
    </bean>

    <bean id="messageListenerContainer"   
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="remoteConnectionFactory"/>
    <property name="destination" ref="sourceQueue"/>
    <property name="maxConcurrentConsumers" value="10"/>
    <property name="concurrentConsumers" value="1"/>
    <property name="autoStartup" value="true"/>         
</bean>

<int:channel id="inbound"/>

<int-jms:message-driven-channel-adapter id="jmsIn" 
                                        channel="inbound"
                                        extract-payload="false" 
                                        container="messageListenerContainer" />


    <int:service-activator input-channel="inbound"
                        output-channel="outbound"
                        ref="messageReader"
                        method="onMessage" />

</beans>


-- Message Reader Class

import javax.jms.JMSException;

import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

@Component("messageReader")
public class MessageReader
{

@ServiceActivator
public void onMessage(Message inboundMessage) {

    System.out.println(" -------Message Read Start--------");

    System.out.println(inboundMessage.getHeaders());

    System.out.println(" -------Message Headers Reading completed--------");

    System.out.println("payload-->" + inboundMessage.getPayload().getClass());
    String payload = inboundMessage.getPayload().toString();
    System.out.println("payload value-->" + payload);


    org.apache.activemq.command.ActiveMQTextMessage obj = (org.apache.activemq.command.ActiveMQTextMessage)inboundMessage.getPayload();
    System.out.println("Object-->" + obj);

    String var = null;
    try {
        var = obj.getText();
        System.out.println("Datastructure-->" + obj.getText());
    } catch (JMSException e) {

        e.printStackTrace();
    }

}

}


---- Message Writer Class

@Component("sendMessage")
public class SendMessage {

@Autowired
private MessageChannel outbound;


public void send(String name)
{
    Entity entity = new Entity(1,"anuj");

    Message<Entity> message = MessageBuilder.withPayload(entity)
                                .setHeader("Message_Header1", "Message_Header1_Value")
                                .setHeader("Message_Header2", "Message_Header2_Value")
                                .build();

    outbound.send(message);
}

}


-- Application main class
public class App {

public static void main( String[] args )
{
    ClassPathXmlApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:7_applicationContext.xml" );

    SendMessage sendMessage = (SendMessage)applicationContext.getBean( "sendMessage", SendMessage.class);

    for(int i=0;i<10;i++){
        sendMessage.send("This is Message Content");
    }

    applicationContext.registerShutdownHook();
}

}

【问题讨论】:

  • 您找到解决方案了吗?我在同一条船上。

标签: spring spring-integration spring-jms


【解决方案1】:

如果您将连接工厂包装在 CachingConnectionFactory 中,则所有消费者将共享一个连接。

container会根据需求在concurrentConsumersmaxConcurrentConsumers之间调整消费者;在活动爆发之后,消费者需要一段时间才能减少。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2011-05-19
    • 2010-10-21
    • 2015-09-10
    • 2014-08-23
    • 2013-01-24
    • 2012-12-22
    • 2020-02-12
    • 2016-10-06
    相关资源
    最近更新 更多