【问题标题】:RabbitMQ to BlockingQueue bindingRabbitMQ 到 BlockingQueue 绑定
【发布时间】:2015-09-10 19:18:19
【问题描述】:

我正在开发一个多线程应用程序,其中几个“处理器”(ThreadPools 中的 Runnables)相互发送消息。它们使用BlockingQueue 接口进行通信:当处理器A 完成任务T1 时,它会将其推送到队列Q1(例如,BlockingQueue<MyTask>,如果T1 由类MyTask 表示);之后,处理器BQ1 拉取任务,执行计算并将结果结果推送到Q2;等等。

我使用LinkedBlockingQueue,因为我的应用程序是单片的,所有处理器都“生活”在同一个 JVM 中。但是,我希望我的应用程序变得模块化 (Microservice Architecture),所以我决定使用 RabbitMQ 作为消息代理。

问题是从队列的 Java 实现迁移到 RabbitMQ,而客户端源代码的更改最少。因此,我尝试在 RabbitMQ 抽象和BlockingQueue 接口之间找到某种绑定。因此,当有人向 amqp 的队列发送消息时,它应该出现在 java 队列中。反之亦然:当有人将对象推送到 java 队列时,它应该被传播到 amqp 的交换器。

轮询的示例实现(来自 amqp 的队列,使用 spring-amqp)如下所示。

<T> BlockingQueue<T> createQueue(Class<T> elementType, MessageListenerContainer listenerContainer) {
    LinkedBlockingQueue<T> queue = new LinkedBlockingQueue<>();

    MessageConverter messageConverter = listenerContainer.getMessageConverter();
    listenerContainer.setupMessageListener((MessageListener) message -> {
        Object task = messageConverter.fromMessage(message);
        queue.offer(elementType.cast(task));
    });

    return queue;
}

我现在找不到使用 RabbitMQ 队列实现BlockingQueue 接口的框架。如果这种框架不存在,我的想法在架构上是否在某种程度上是错误的,或者只是还没有人实现呢?

【问题讨论】:

    标签: java queue rabbitmq messaging spring-amqp


    【解决方案1】:

    我不确定您是否真的想按照您描述的方式进行操作 - 入站消息将被传递到队列并位于内存中,而不是 RabbitMQ。

    我认为一个简单的BlockingQueue 实现在下面使用RabbitTemplate 从rabbit 队列中提取消息(使用receive()receiveAndConvert())对于获取/轮询操作可能更好 - 它会留下消息在 RabbitMQ 中直到需要,只需 RabbitTemplate.convertAndSend() 进行提供/放置操作。

    虽然很简单,但它可能是对框架的有用补充;考虑contributing

    【讨论】:

    • 除了 Gary 的回答,我建议看一下提供 PollableAmqpChannel 的 Spring Integration Framework:docs.spring.io/spring-integration/reference/html/…
    • 加里,谢谢你的回答。实际上,我的意思是那种实现,但我认为它已经存在。在我的项目中集成此实现后,我会考虑做出贡献(如果 spring-integration 在我的情况下不起作用)。
    • @ArtemBilan 谢谢,我会试试 spring-integration。第一印象非常积极。
    • Gary,最近我意识到receive() 没有阻塞语义。如我所见,you suggest绕过spring-amqp直接使用rabbit客户端实现阻塞行为(扩展Consumer)?
    • recent 1.5 release 中添加了阻止receive()。感谢您指出 JIRA 问题;它被 this one 取代。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2012-08-23
    • 1970-01-01
    • 1970-01-01
    • 2020-04-30
    • 1970-01-01
    • 1970-01-01
    • 2012-12-16
    相关资源
    最近更新 更多