【问题标题】:How to manually ack messages from rabbitmq using spring integration如何使用spring集成手动确认来自rabbitmq的消息
【发布时间】:2023-11-20 10:39:01
【问题描述】:

[编辑] 上传完整的配置:

rabbit.xml 从 rabbit 中出队

<rabbit:connection-factory id="amqpConnectionFactoryInbound" 
host="${rabbit.host}" port="${rabbit.port}"
username="${rabbit.username}" password="${rabbit.password}" channel-
cache-size="5"
connection-factory="rabbitConnectionFactoryInbound"/>

<beans:bean id="rabbitConnectionFactoryInbound" 
class="com.rabbitmq.client.ConnectionFactory">
<beans:property name="requestedHeartbeat" 
value="60" />
</beans:bean>


<!-- Inbound Adapter to AMQP RabbitMq and write to file -->
<int-amqp:inbound-channel-adapter id="rabbitMQInboundChannelAdapter" 
channel="rabbitInboundMessageChannel"
concurrent-consumers="8" task-
executor="rabbit-executor" connection-
factory="amqpConnectionFactoryInbound"
message-converter="byteArrayToStringConverter" queue-
names="${rabbit.queue}" acknowledge-mode="MANUAL" error-
channel="errorChannelId"
prefetch-count="25" />

<header-enricher input-channel="rabbitInboundMessageChannel" output-
channel="rabbitOutboundboundMessageChannel">
<int:header name="Operation" value="${operation.rabbit}" />
<int:header name="GUID" expression="#{ 
'T(java.util.UUID).randomUUID().toString()' }" />
<int:header name="operationStartTime" expression="#{ 
'T(java.lang.System).currentTimeMillis()' }" />
</header-enricher>

<int:channel id="rabbitOutboundboundMessageChannel">
<int:interceptors>
<int:wire-tap channel="loggerChannel" />
</int:interceptors>
</int:channel>

<task:executor id="rabbit-executor" rejection-policy="CALLER_RUNS" 
pool-size="10-30"
queue-capacity="25" />
</beans:beans>

然后将消息发送到路由器通道:router.xml

<int:header-enricher input-channel="rabbitOutboundboundMessageChannel" 
output-channel="routerChannel">
<int:header name="Operation" value="${operation.router}" 
overwrite="true" />
<int:header name="file_name" expression="headers['GUID'] + '.xml'" />
<int:header name="operationStartTime" expression="#{ 
'T(java.lang.System).currentTimeMillis()' }"
overwrite="true" />
<int:error-channel ref="errorChannelId" />
</int:header-enricher>

<int:recipient-list-router id="rabbitMsgrouter" input-
channel="routerChannel">
<int:recipient channel="fileBackupChannel" selector-expression="new 
String(payload).length()>0" />
<int:recipient channel="transformerChannel" />
</int:recipient-list-router>

<int:channel id="transformerChannel">
<int:interceptors>
<int:wire-tap channel="loggerChannel" />
</int:interceptors>
</int:channel>
<int:channel id="fileBackupChannel"/>
<int:channel id="loggerChannel"/>
</beans>

消息现在被发送到persister.xml 和transformer.xml。以下是persister.xml,如果持久性成功,我想确认一下。在transformer.xml之后还有其他下游流程

<int:header-enricher input-channel="fileBackupChannel" output-
channel="fileSaveChannel">
<int:header name="Operation" value="${operation.filePersister}" 
overwrite="true" />
<int:header name="replyChannel" value="nullChannel" />
<int:header name="operationStartTime" expression="#{ 
'T(java.lang.System).currentTimeMillis()' }" />
<int:error-channel ref="errorChannelId" />
</int:header-enricher>

<int-file:outbound-gateway id="fileBackUpChannelAdapter" 
directory="${file.location}"
request-channel="fileSaveChannel" reply-channel="rabbitAckChannel"/>

<int:service-activator input-channel="rabbitAckChannel" output-
channel="nullChannel" ref="ackRabbit" method="handleRabbitAcks" />

<bean id="ackRabbit" 
class="com.expedia.dataloader.rabbit.RabbitAcknowledgement"/>

<int:channel id="rabbitAckChannel">
<int:interceptors>
<int:wire-tap channel="loggerChannel" />
</int:interceptors>
</int:channel>
<int:channel id="loggerChannel"/>
<int:channel id="fileSaveChannel"/>
</beans>

我在从 rabbitmq 手动确认有效负载时遇到问题。

这是我的工作流程:

1.使用入站通道适配器从兔子那里获取消息:

<int-amqp:inbound-channel-adapter id="rabbitMQInboundChannelAdapter" 
channel="rabbitInboundMessageChannel"
concurrent-consumers="${rabbit.concurrentConsumers}" task-
executor="rabbit-executor" connection-
factory="amqpConnectionFactoryInbound"
message-converter="byteArrayToStringConverter" queue-
names="${rabbit.queue}" acknowledge-mode="MANUAL" error-
channel="errorChannelId"
prefetch-count="${rabbit.prefetchCount}" />

2。使用出站网关将消息持久化到磁盘:

<int-file:outbound-gateway id="fileBackUpChannelAdapter" 
directory="${file.location}"
request-channel="fileSaveChannel" reply-channel="loggerChannel" />

3.当持久化器(第 2 步)成功时,来自兔子的 ack。

对于第 (3) 步,我编写了以下代码:

public class RabbitAcknowledgement {
public void handleRabbitAcks(Message<?> message) throws IOException {
com.rabbitmq.client.Channel channel = (Channel) 
message.getHeaders().get("amqp_channel");
long deliveryTag = (long) message.getHeaders().get("amqp_deliveryTag");
channel.basicAck(deliveryTag, false);
}

我从春天通过以下方式调用:

<int:service-activator input-
channel="rabbitOutboundboundMessageChannel" output-
channel="routerChannel" ref="ackRabbit" method="handleRabbitAcks" />

这不起作用,我队列中的兔子有效载荷没有得到确认。

我的问题是:

  1. 在这种情况下我需要手动确认吗?
  2. 我做错了什么?

【问题讨论】:

  • 请问有哪些症状证明无效?
  • 有效载荷保留在rabbit上,不被消费者消费。
  • 另外,在我当前的设置中,我相信我每个线程都确认 1 条消息。我能否以某种方式提高速率(即分批确认?)对功能并不重要,但想知道我是否以某种方式使系统更高效
  • 一般不需要手动确认;使用 AUTO,如果持久化成功,容器将 ack 消息,如果抛出异常,则 nack 消息。像这样的简单场景需要使用手动确认是不寻常的。但是,它应该可以工作。你在容器线程上运行 ack 吗?或者您是否正在使用队列或执行程序通道将其移交给另一个线程?您能否编辑问题以显示您的完整配置(包括频道)。还要打开 DEBUG 日志记录以查看它是否提供更多信息。
  • 谢谢加里。如果在同一线程上运行其他下游进程,AUTO 将如何工作?例如。持久化后,我想转换有效载荷并写入 solr。目前,只有在转换和 solr 都成功导致兔子服务器上的大量负载时才会发生 ACK。所有这些进程都有自己的配置(rabbit.xml、persist.xml、transform.xml、solr.xml 等)。

标签: spring spring-integration spring-amqp


【解决方案1】:

它应该可以正常工作;我刚刚进行了快速测试,它对我有用...

@SpringBootApplication
public class So44666444Application implements CommandLineRunner {

    public static void main(String[] args) {
        SpringApplication.run(So44666444Application.class, args).close();
    }

    @Autowired
    private RabbitTemplate template;

    private final CountDownLatch latch = new CountDownLatch(1);

    @Override
    public void run(String... args) throws Exception {
        this.template.convertAndSend("foo", "bar");
        latch.await();
    }

    @Bean
    public AmqpInboundChannelAdapter adapter(ConnectionFactory cf) {
        AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(listenerContainer(cf));
        adapter.setOutputChannelName("ack");
        return adapter;
    }

    @Bean
    public AbstractMessageListenerContainer listenerContainer(ConnectionFactory cf) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(cf);
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        container.setQueueNames("foo");
        return container;
    }

    @ServiceActivator(inputChannel = "ack")
    public void ack(@Header(AmqpHeaders.CHANNEL) Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) Long tag)
            throws IOException {
        System.out.println("Acking: " + tag);
        channel.basicAck(tag, false);
        latch.countDown();
    }

}

如果我在basicAck 上设置断点,我会在控制台上看到消息未确认;跳到下一行,消息被删除。

【讨论】:

  • 感谢@Gary Russell 我在我的代码中发现了一个错误并让它工作:将工作配置上传到原始帖子。有什么办法可以提高效率吗? (确认更快,使用更少的资源等)