【发布时间】:2023-11-20 10:39:01
【问题描述】:
[编辑] 上传完整的配置:
rabbit.xml 从 rabbit 中出队
<rabbit:connection-factory id="amqpConnectionFactoryInbound"
host="${rabbit.host}" port="${rabbit.port}"
username="${rabbit.username}" password="${rabbit.password}" channel-
cache-size="5"
connection-factory="rabbitConnectionFactoryInbound"/>
<beans:bean id="rabbitConnectionFactoryInbound"
class="com.rabbitmq.client.ConnectionFactory">
<beans:property name="requestedHeartbeat"
value="60" />
</beans:bean>
<!-- Inbound Adapter to AMQP RabbitMq and write to file -->
<int-amqp:inbound-channel-adapter id="rabbitMQInboundChannelAdapter"
channel="rabbitInboundMessageChannel"
concurrent-consumers="8" task-
executor="rabbit-executor" connection-
factory="amqpConnectionFactoryInbound"
message-converter="byteArrayToStringConverter" queue-
names="${rabbit.queue}" acknowledge-mode="MANUAL" error-
channel="errorChannelId"
prefetch-count="25" />
<header-enricher input-channel="rabbitInboundMessageChannel" output-
channel="rabbitOutboundboundMessageChannel">
<int:header name="Operation" value="${operation.rabbit}" />
<int:header name="GUID" expression="#{
'T(java.util.UUID).randomUUID().toString()' }" />
<int:header name="operationStartTime" expression="#{
'T(java.lang.System).currentTimeMillis()' }" />
</header-enricher>
<int:channel id="rabbitOutboundboundMessageChannel">
<int:interceptors>
<int:wire-tap channel="loggerChannel" />
</int:interceptors>
</int:channel>
<task:executor id="rabbit-executor" rejection-policy="CALLER_RUNS"
pool-size="10-30"
queue-capacity="25" />
</beans:beans>
然后将消息发送到路由器通道:router.xml
<int:header-enricher input-channel="rabbitOutboundboundMessageChannel"
output-channel="routerChannel">
<int:header name="Operation" value="${operation.router}"
overwrite="true" />
<int:header name="file_name" expression="headers['GUID'] + '.xml'" />
<int:header name="operationStartTime" expression="#{
'T(java.lang.System).currentTimeMillis()' }"
overwrite="true" />
<int:error-channel ref="errorChannelId" />
</int:header-enricher>
<int:recipient-list-router id="rabbitMsgrouter" input-
channel="routerChannel">
<int:recipient channel="fileBackupChannel" selector-expression="new
String(payload).length()>0" />
<int:recipient channel="transformerChannel" />
</int:recipient-list-router>
<int:channel id="transformerChannel">
<int:interceptors>
<int:wire-tap channel="loggerChannel" />
</int:interceptors>
</int:channel>
<int:channel id="fileBackupChannel"/>
<int:channel id="loggerChannel"/>
</beans>
消息现在被发送到persister.xml 和transformer.xml。以下是persister.xml,如果持久性成功,我想确认一下。在transformer.xml之后还有其他下游流程
<int:header-enricher input-channel="fileBackupChannel" output-
channel="fileSaveChannel">
<int:header name="Operation" value="${operation.filePersister}"
overwrite="true" />
<int:header name="replyChannel" value="nullChannel" />
<int:header name="operationStartTime" expression="#{
'T(java.lang.System).currentTimeMillis()' }" />
<int:error-channel ref="errorChannelId" />
</int:header-enricher>
<int-file:outbound-gateway id="fileBackUpChannelAdapter"
directory="${file.location}"
request-channel="fileSaveChannel" reply-channel="rabbitAckChannel"/>
<int:service-activator input-channel="rabbitAckChannel" output-
channel="nullChannel" ref="ackRabbit" method="handleRabbitAcks" />
<bean id="ackRabbit"
class="com.expedia.dataloader.rabbit.RabbitAcknowledgement"/>
<int:channel id="rabbitAckChannel">
<int:interceptors>
<int:wire-tap channel="loggerChannel" />
</int:interceptors>
</int:channel>
<int:channel id="loggerChannel"/>
<int:channel id="fileSaveChannel"/>
</beans>
我在从 rabbitmq 手动确认有效负载时遇到问题。
这是我的工作流程:
1.使用入站通道适配器从兔子那里获取消息:
<int-amqp:inbound-channel-adapter id="rabbitMQInboundChannelAdapter"
channel="rabbitInboundMessageChannel"
concurrent-consumers="${rabbit.concurrentConsumers}" task-
executor="rabbit-executor" connection-
factory="amqpConnectionFactoryInbound"
message-converter="byteArrayToStringConverter" queue-
names="${rabbit.queue}" acknowledge-mode="MANUAL" error-
channel="errorChannelId"
prefetch-count="${rabbit.prefetchCount}" />
2。使用出站网关将消息持久化到磁盘:
<int-file:outbound-gateway id="fileBackUpChannelAdapter"
directory="${file.location}"
request-channel="fileSaveChannel" reply-channel="loggerChannel" />
3.当持久化器(第 2 步)成功时,来自兔子的 ack。
对于第 (3) 步,我编写了以下代码:
public class RabbitAcknowledgement {
public void handleRabbitAcks(Message<?> message) throws IOException {
com.rabbitmq.client.Channel channel = (Channel)
message.getHeaders().get("amqp_channel");
long deliveryTag = (long) message.getHeaders().get("amqp_deliveryTag");
channel.basicAck(deliveryTag, false);
}
我从春天通过以下方式调用:
<int:service-activator input-
channel="rabbitOutboundboundMessageChannel" output-
channel="routerChannel" ref="ackRabbit" method="handleRabbitAcks" />
这不起作用,我队列中的兔子有效载荷没有得到确认。
我的问题是:
- 在这种情况下我需要手动确认吗?
- 我做错了什么?
【问题讨论】:
-
请问有哪些症状证明无效?
-
有效载荷保留在rabbit上,不被消费者消费。
-
另外,在我当前的设置中,我相信我每个线程都确认 1 条消息。我能否以某种方式提高速率(即分批确认?)对功能并不重要,但想知道我是否以某种方式使系统更高效
-
一般不需要手动确认;使用 AUTO,如果持久化成功,容器将 ack 消息,如果抛出异常,则 nack 消息。像这样的简单场景需要使用手动确认是不寻常的。但是,它应该可以工作。你在容器线程上运行 ack 吗?或者您是否正在使用队列或执行程序通道将其移交给另一个线程?您能否编辑问题以显示您的完整配置(包括频道)。还要打开 DEBUG 日志记录以查看它是否提供更多信息。
-
谢谢加里。如果在同一线程上运行其他下游进程,AUTO 将如何工作?例如。持久化后,我想转换有效载荷并写入 solr。目前,只有在转换和 solr 都成功导致兔子服务器上的大量负载时才会发生 ACK。所有这些进程都有自己的配置(rabbit.xml、persist.xml、transform.xml、solr.xml 等)。
标签: spring spring-integration spring-amqp