【发布时间】:2021-01-23 02:16:12
【问题描述】:
下面是Qpid客户端的配置,使用Camel上下文连接Artemis broker。
上下文文件按预期启动,但在这种情况下 qpid 客户端无法连接到 Apache Artemis 2.14.0 代理。
qpid 客户端无法连接代理服务器,存在线程阻塞。我不确定客户端实际发生了什么。
但我们注意到,对于另一项服务,我们使用单个 spring Camel 上下文文件,其中包含所有队列和引用,它们按预期工作。
生产者使用 producerTemplate 使用 camelContextAware 实现推送消息数据。
队列上的生产者和消费者是否有可能在这里发生冲突?
我看到的只是消费者端的一些线程块。
common-context.xml
...
<bean id="jmsConnectionFactory" class="org.apache.qpid.jms.JmsConnectionFactory">
<property name="remoteURI" value="failover:(amqp://host1:5672,amqp://host2:5672)"/>
</bean>
<bean id="pooledConnectionFactory" class="org.messaginghub.pooled.jms.JmsPoolConnectionFactory"
init-method="start" destroy-method="stop" >
<property name="maxConnections" value="3" />
<property name="connectionFactory" ref="jmsConnectionFactory" />
</bean>
<bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
<property name="connectionFactory" ref="pooledConnectionFactory" />
<property name="concurrentConsumers" value="3" />
</bean>
<bean id="env-queue" class="org.apache.camel.component.amqp.AMQPComponent">
<property name="configuration" ref="jmsConfig" />
</bean>
...
服务上下文.xml 注意:在这个单个 xml 文件中有两个骆驼上下文,即使所有上下文都得到了,使用骆驼 2.20.0 版本
...
< import resource="classpath:/common-context.xml"/>
...
<route id="messageProcessor" shutdownRoute="Default" shutdownRunningTask="CompleteAllTasks">
<from uri="env-queue:queue:order-demo-queue" id="OrderInfo"/>
<camel:setProperty propertyName="orderType">
<camel:jsonpath trim="true">$.orderType</camel:jsonpath>
</camel:setProperty>
...
日志:
2020-10-05 23:25:21,078: Thread-1 DEBUG (DefaultCamelContext.java:3993) - Starting consumer (order: 1000) on route: messageProcessor
2020-10-05 23:25:21,079: Thread-1 DEBUG (DefaultManagementAgent.java:470) - Registered MBean with ObjectName: org.apache.camel:context=orderventprocesor,type=consumers,name=JmsConsumer(0x3fed52a3)
2020-10-05 23:25:21,079: Thread-1 DEBUG (DefaultConsumer.java:144) - Starting consumer: Consumer[env-queue://queue:order-demo-queue]
2020-10-05 23:25:21,138: Thread-1 DEBUG (FailoverProvider.java:153) - Initiating initial connection attempt task
2020-10-05 23:25:21,140: FailoverProvider: async work thread DEBUG (FailoverProvider.java:744) - Connection attempt:[1] to: amqp://host1:5672 in-progress
2020-10-05 23:25:21,145: Thread-1 DEBUG (AbstractJmsListeningContainer.java:382) - Established shared JMS Connection
2020-10-05 23:25:21,145: Thread-1 DEBUG (AbstractJmsListeningContainer.java:549) - Resumed paused task: org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker@51524bce
2020-10-05 23:25:21,146: Thread-1 DEBUG (AbstractJmsListeningContainer.java:549) - Resumed paused task: org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker@708b22ca
2020-10-05 23:25:21,147: Thread-1 DEBUG (AbstractJmsListeningContainer.java:549) - Resumed paused task: org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker@314f82d7
2020-10-05 23:25:21,148: Thread-1 DEBUG (AbstractJmsListeningContainer.java:549) - Resumed paused task: org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker@73719eb3
2020-10-05 23:25:21,148: Thread-1 DEBUG (AbstractJmsListeningContainer.java:549) - Resumed paused task: org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker@30beb728
2020-10-05 23:25:21,155: FailoverProvider: async work thread INFO (FailoverProvider.java:751) - Connection attempt:[1] to: amqp://host1:5672 failed
2020-10-05 23:25:21,156: FailoverProvider: async work thread DEBUG (FailoverProvider.java:744) - Connection attempt:[1] to: amqp://host2:5672 in-progress
2020-10-05 23:25:21,157: FailoverProvider: async work thread INFO (FailoverProvider.java:751) - Connection attempt:[1] to: amqp://host2:5672 failed
2020-10-05 23:25:21,167: FailoverProvider: async work thread DEBUG (FailoverProvider.java:744) - Connection attempt:[2] to: amqp://host1:5672 in-progress
线程转储(如下所示)
Camel (orderventprocesor) thread #5 - JmsConsumer[order-demo-queue]" #45 daemon prio=5 os_prio=0 tid=0x00007faf395b3000 nid=0x58e3 waiting for monitor entry [0x00007faf1290c000]
INFO | jvm 1 | 2020/10/06 00:01:38 | java.lang.Thread.State: BLOCKED (on object monitor)
INFO | jvm 1 | 2020/10/06 00:01:38 | at org.springframework.jms.listener.AbstractJmsListeningContainer.getSharedConnection(AbstractJmsListeningContainer.java:492)
INFO | jvm 1 | 2020/10/06 00:01:38 | - waiting to lock <0x00000007ba976500> (a java.lang.Object)
INFO | jvm 1 | 2020/10/06 00:01:38 | at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.initResourcesIfNecessary(DefaultMessageListenerContainer.java:1170)
INFO | jvm 1 | 2020/10/06 00:01:38 | at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1149)
INFO | jvm 1 | 2020/10/06 00:01:38 | at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1142)
INFO | jvm 1 | 2020/10/06 00:01:38 | at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1039)
INFO | jvm 1 | 2020/10/06 00:01:38 | at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
INFO | jvm 1 | 2020/10/06 00:01:38 | at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
INFO | jvm 1 | 2020/10/06 00:01:38 | at java.lang.Thread.run(Thread.java:748)
更新:
当我将 remoteUri 更改为简单的 amqp://localhost:5672 时,我在 netty-resolver 上看到了一个异常
java.lang.NoClassDefFoundError: "io/netty/resolver/AddressResolverGroup"
包含 netty-resolver 4.1.51 jar 后,现在我看到以下异常。
2020-10-09 15:11:04.944 Camel (orderventprocesor) thread #6 - JmsConsumer[order-demo-queue] ERROR JmsConnection:165 - Failed to connect to remote at: amqp://localhost:5672
2020-10-09 15:11:04.946 Camel (orderventprocesor) thread #6 - JmsConsumer[order-demo-queue] ERROR DefaultJmsMessageListenerContainer:934 - Could not refresh JMS Connection for destination 'order-demo-queue' - retrying using FixedBackOff{interval=5000, currentAttempts=0, maxAttempts=unlimited}. Cause: io.netty.util.concurrent.MultithreadEventExecutorGroup.newChild(Ljava/util/concurrent/Executor;[Ljava/lang/Object;)Lio/netty/util/concurrent/EventExecutor;
javax.jms.JMSException: io.netty.util.concurrent.MultithreadEventExecutorGroup.newChild(Ljava/util/concurrent/Executor;[Ljava/lang/Object;)Lio/netty/util/concurrent/EventExecutor;
at org.apache.qpid.jms.provider.ProviderException.toJMSException(ProviderException.java:34)
at org.apache.qpid.jms.exceptions.JmsExceptionSupport.create(JmsExceptionSupport.java:80)
at org.apache.qpid.jms.exceptions.JmsExceptionSupport.create(JmsExceptionSupport.java:112)
at org.apache.qpid.jms.JmsConnection.connect(JmsConnection.java:176)
at org.apache.qpid.jms.JmsConnectionFactory.createConnection(JmsConnectionFactory.java:212)
at org.apache.qpid.jms.JmsConnectionFactory.createConnection(JmsConnectionFactory.java:199)
at org.messaginghub.pooled.jms.JmsPoolConnectionFactory.createProviderConnection(JmsPoolConnectionFactory.java:651)
at org.messaginghub.pooled.jms.JmsPoolConnectionFactory$1.makeObject(JmsPoolConnectionFactory.java:108)
at org.messaginghub.pooled.jms.JmsPoolConnectionFactory$1.makeObject(JmsPoolConnectionFactory.java:105)
at org.apache.commons.pool2.impl.GenericKeyedObjectPool.create(GenericKeyedObjectPool.java:1041)
at org.apache.commons.pool2.impl.GenericKeyedObjectPool.addObject(GenericKeyedObjectPool.java:1221)
at org.messaginghub.pooled.jms.JmsPoolConnectionFactory.createJmsPoolConnection(JmsPoolConnectionFactory.java:705)
at org.messaginghub.pooled.jms.JmsPoolConnectionFactory.createConnection(JmsPoolConnectionFactory.java:237)
at org.messaginghub.pooled.jms.JmsPoolConnectionFactory.createConnection(JmsPoolConnectionFactory.java:232)
at org.springframework.jms.support.JmsAccessor.createConnection(JmsAccessor.java:180)
at org.springframework.jms.listener.AbstractJmsListeningContainer.createSharedConnection(AbstractJmsListeningContainer.java:413)
at org.springframework.jms.listener.AbstractJmsListeningContainer.refreshSharedConnection(AbstractJmsListeningContainer.java:398)
at org.springframework.jms.listener.DefaultMessageListenerContainer.refreshConnectionUntilSuccessful(DefaultMessageListenerContainer.java:915)
at org.springframework.jms.listener.DefaultMessageListenerContainer.recoverAfterListenerSetupFailure(DefaultMessageListenerContainer.java:890)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1061)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.qpid.jms.provider.exceptions.ProviderIOException: io.netty.util.concurrent.MultithreadEventExecutorGroup.newChild(Ljava/util/concurrent/Executor;[Ljava/lang/Object;)Lio/netty/util/concurrent/EventExecutor;
at org.apache.qpid.jms.provider.exceptions.ProviderExceptionSupport.createOrPassthroughFatal(ProviderExceptionSupport.java:46)
at org.apache.qpid.jms.provider.amqp.AmqpProvider.connect(AmqpProvider.java:308)
at org.apache.qpid.jms.JmsConnection.connect(JmsConnection.java:162)
... 19 more
Caused by: java.lang.AbstractMethodError: io.netty.util.concurrent.MultithreadEventExecutorGroup.newChild(Ljava/util/concurrent/Executor;[Ljava/lang/Object;)Lio/netty/util/concurrent/EventExecutor;
at io.netty.util.concurrent.MultithreadEventExecutorGroup.<init>(MultithreadEventExecutorGroup.java:84)
at io.netty.util.concurrent.MultithreadEventExecutorGroup.<init>(MultithreadEventExecutorGroup.java:58)
at io.netty.util.concurrent.MultithreadEventExecutorGroup.<init>(MultithreadEventExecutorGroup.java:47)
at io.netty.channel.MultithreadEventLoopGroup.<init>(MultithreadEventLoopGroup.java:50)
at io.netty.channel.nio.NioEventLoopGroup.<init>(NioEventLoopGroup.java:70)
at io.netty.channel.nio.NioEventLoopGroup.<init>(NioEventLoopGroup.java:65)
at io.netty.channel.nio.NioEventLoopGroup.<init>(NioEventLoopGroup.java:56)
at org.apache.qpid.jms.transports.netty.NettyTcpTransport.connect(NettyTcpTransport.java:151)
at org.apache.qpid.jms.provider.amqp.AmqpProvider.connect(AmqpProvider.java:230)
... 20 more
2020-10-09 15:11:09.949 Camel (orderventprocesor) thread #6 - JmsConsumer[order-demo-queue] ERROR JmsConnection:165 - Failed to connect to remote at: amqp://localhost:5672
2020-10-09 15:11:09.950 Camel (orderventprocesor) thread #6 - JmsConsumer[order-demo-queue] ERROR DefaultJmsMessageListenerContainer:934 - Could not refresh JMS Connection for destination 'order-demo-queue' - retrying using FixedBackOff{interval=5000, currentAttempts=1, maxAttempts=unlimited}. Cause: io.netty.util.concurrent.MultithreadEventExecutorGroup.newChild(Ljava/util/concurrent/Executor;[Ljava/lang/Object;)Lio/netty/util/concurrent/EventExecutor;
javax.jms.JMSException: io.netty.util.concurrent.MultithreadEventExecutorGroup.newChild(Ljava/util/concurrent/Executor;[Ljava/lang/Object;)Lio/netty/util/concurrent/EventExecutor;
at org.apache.qpid.jms.provider.ProviderException.toJMSException(ProviderException.java:34)
at org.apache.qpid.jms.exceptions.JmsExceptionSupport.create(JmsExceptionSupport.java:80)
at org.apache.qpid.jms.exceptions.JmsExceptionSupport.create(JmsExceptionSupport.java:112)
at org.apache.qpid.jms.JmsConnection.connect(JmsConnection.java:176)
at org.apache.qpid.jms.JmsConnectionFactory.createConnection(JmsConnectionFactory.java:212)
at org.apache.qpid.jms.JmsConnectionFactory.createConnection(JmsConnectionFactory.java:199)
at org.messaginghub.pooled.jms.JmsPoolConnectionFactory.createProviderConnection(JmsPoolConnectionFactory.java:651)
at org.messaginghub.pooled.jms.JmsPoolConnectionFactory$1.makeObject(JmsPoolConnectionFactory.java:108)
at org.messaginghub.pooled.jms.JmsPoolConnectionFactory$1.makeObject(JmsPoolConnectionFactory.java:105)
at org.apache.commons.pool2.impl.GenericKeyedObjectPool.create(GenericKeyedObjectPool.java:1041)
at org.apache.commons.pool2.impl.GenericKeyedObjectPool.addObject(GenericKeyedObjectPool.java:1221)
at org.messaginghub.pooled.jms.JmsPoolConnectionFactory.createJmsPoolConnection(JmsPoolConnectionFactory.java:705)
at org.messaginghub.pooled.jms.JmsPoolConnectionFactory.createConnection(JmsPoolConnectionFactory.java:237)
at org.messaginghub.pooled.jms.JmsPoolConnectionFactory.createConnection(JmsPoolConnectionFactory.java:232)
at org.springframework.jms.support.JmsAccessor.createConnection(JmsAccessor.java:180)
at org.springframework.jms.listener.AbstractJmsListeningContainer.createSharedConnection(AbstractJmsListeningContainer.java:413)
at org.springframework.jms.listener.AbstractJmsListeningContainer.refreshSharedConnection(AbstractJmsListeningContainer.java:398)
at org.springframework.jms.listener.DefaultMessageListenerContainer.refreshConnectionUntilSuccessful(DefaultMessageListenerContainer.java:915)
at org.springframework.jms.listener.DefaultMessageListenerContainer.recoverAfterListenerSetupFailure(DefaultMessageListenerContainer.java:890)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1061)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.qpid.jms.provider.exceptions.ProviderIOException: io.netty.util.concurrent.MultithreadEventExecutorGroup.newChild(Ljava/util/concurrent/Executor;[Ljava/lang/Object;)Lio/netty/util/concurrent/EventExecutor;
at org.apache.qpid.jms.provider.exceptions.ProviderExceptionSupport.createOrPassthroughFatal(ProviderExceptionSupport.java:46)
at org.apache.qpid.jms.provider.amqp.AmqpProvider.connect(AmqpProvider.java:308)
at org.apache.qpid.jms.JmsConnection.connect(JmsConnection.java:162)
... 19 more
Caused by: java.lang.AbstractMethodError: io.netty.util.concurrent.MultithreadEventExecutorGroup.newChild(Ljava/util/concurrent/Executor;[Ljava/lang/Object;)Lio/netty/util/concurrent/EventExecutor;
at io.netty.util.concurrent.MultithreadEventExecutorGroup.<init>(MultithreadEventExecutorGroup.java:84)
at io.netty.util.concurrent.MultithreadEventExecutorGroup.<init>(MultithreadEventExecutorGroup.java:58)
at io.netty.util.concurrent.MultithreadEventExecutorGroup.<init>(MultithreadEventExecutorGroup.java:47)
at io.netty.channel.MultithreadEventLoopGroup.<init>(MultithreadEventLoopGroup.java:50)
at io.netty.channel.nio.NioEventLoopGroup.<init>(NioEventLoopGroup.java:70)
at io.netty.channel.nio.NioEventLoopGroup.<init>(NioEventLoopGroup.java:65)
at io.netty.channel.nio.NioEventLoopGroup.<init>(NioEventLoopGroup.java:56)
at org.apache.qpid.jms.transports.netty.NettyTcpTransport.connect(NettyTcpTransport.java:151)
at org.apache.qpid.jms.provider.amqp.AmqpProvider.connect(AmqpProvider.java:230)
... 20 more
【问题讨论】:
标签: apache-camel activemq-artemis qpid