【问题标题】:Configuring Spring Integration QueueChannel for RabbitMQ为 RabbitMQ 配置 Spring Integration QueueChannel
【发布时间】:2013-09-25 14:45:16
【问题描述】:

我目前正在使用 spring AMQP API 连接到 RabbitMQ。基本上在我的消费者代码中,我正在异步读取消息并在弹性搜索中进行批量插入。当我执行 ack = AUTO 时,我的速度适中,为 400-500 msg/sec(从队列中读取)。当我执行 ack = NONE 时,阅读速度的提高是巨大的,即达到 5000-6000 msg/sec。

配置如下:

  • 具有 32GB RAM 的 Linux 机器
  • JVM 参数:

-server -Xms1g -Xmx1g -Xss384k PermSize=256m MaxPermSize=256m

现在的问题是,当我执行 ack = NONE 时,虽然我的速度很好,但 JVM 在一段时间后会出现 OutOfMemory,我可以看到在这种情况下发生了很多 GC。

我计划使用 Spring 集成中的 QueueChannel,我可以将通道的大小限制为它可以包含多少消息。

我如何使用 RabbitMQ 实现这一点,还有没有其他方法可以在不崩溃 JVM 的情况下实现 4000-5000 msg/sec 的良好读取速度?

【问题讨论】:

    标签: jvm rabbitmq spring-integration spring-amqp


    【解决方案1】:

    尝试使用ack=AUTO,但将prefetchtxSize 设置为1000;这样,每 1000 条消息只会发送一个 ack(您可以根据需要调整此值)。

    使用 ackmode=NONE 会导致消息在侦听器容器内的无限阻塞队列中累积,并且只有在侦听器能够跟上时才会起作用(OOM 意味着您的不能)。

    我们有一个open JIRA issue 来解决 ack=NONE 的问题,但上述技术通常就足够了。

    您还可以使用容器并发设置来提高吞吐量。

    【讨论】:

    • 我已经有预取计数为 750,但我没有使用 txSize :( 那会有什么影响?我也可以使用 Spring Integration 的 QueueChannel 与 RabbitMQ,因为它有大小限制等。
    • txSize 控制我们发送确认的频率。这不是一个真正的好属性名称 - 它与交易无关。来自参考手册:“当与 acknowledgeMode AUTO 一起使用时,容器将尝试在发送 ack 之前处理最多此数量的消息(等待每个消息直到接收超时设置)。这也是提交事务通道的时间. 如果 prefetchCount 小于 txSize,则会增加以匹配 txSize。”。在这种情况下,使用 SI QueueChannel 无济于事 - 内存使用情况在侦听器容器内。
    猜你喜欢
    • 2015-12-12
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2023-03-29
    相关资源
    最近更新 更多