【问题标题】:Is it possible to have Java listen to multiple RabbitMQ queues?是否可以让 Java 监听多个 RabbitMQ 队列?
【发布时间】:2012-03-04 15:41:00
【问题描述】:

更新:我认为我的解释可能过于冗长,所以我想一种简单的思考方式是:我有多个工作节点,我想将它们用于不同的任务(以及那些不同的任务来自不同的队列)。目前我只知道如何让他们收听与单一类型的工作相关联的单个队列,但我希望他们收听不同的队列,以便当不同的工作出现时,同一个节点集群可以处理它们。希望那更清楚。


大家好, 我怀疑这是可能的,但我似乎无法弄清楚该怎么做。我浏览了 rabbitmq 网站上的教程,它们真的很有帮助,并且做了我想做的事,只是它没有显示如何在同一个程序中收听多个队列。

我的程序结构基本上是几个阶段。例如,阶段 1 收集大量数据,然后阶段 2 对其进行处理并将其加载到数据库中,阶段 3 将其与其他数据进行分析,等等。每个阶段都无法启动直到前一个阶段完成,我想使用队列系统来使用多台机器更快地完成每个阶段(所以所有消费者都在阶段 1 工作,然后一旦所有消费者都完成,他们就会一起在阶段 2 工作,等等)。

我认为我不能只执行每个阶段一次,因为队列可能为空,计算机将移至下一个队列,我无法知道它是否为空,因为所有工作都已完成,或者它是否已完成,因为我们还没有开始排队。所以我想(如果我错了,请纠正我)一个更好的方法是监听与所有阶段相关的所有队列,并且随着工作进入 phase1Queue 它可以工作,如果工作进入 phase2Queue 它会在之后立即工作(我在我描述的过程之外还有另一个过程,它监视每个阶段何时完成并为下一阶段设置)。希望这是有道理的。

队列示例中的代码对于收听一个队列的消费者很有帮助,但我怎样才能让它收听多个(并根据不同的队列调用不同的程序)。如果已经有一个功能,那就太棒了,但我正在寻找我可以用来在java中实现它的逻辑(最坏的情况我曾想过运行5个单独的程序来监听每个队列,但我试图找出如果有更好的方法,拥有一个包含我所有工作的应用程序会更容易管理分发)。

谢谢!

附言如果它有帮助,这里是适用于rabbitmq 的消费者代码(但你可以看到它只定义了一个队列):

import java.io.IOException;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;

public class Worker {

  private static final String TASK_QUEUE_NAME = "task_queue";

  public static void main(String[] argv)
                      throws java.io.IOException,
                      java.lang.InterruptedException {

    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    channel.basicQos(1);

    QueueingConsumer consumer = new QueueingConsumer(channel);
    channel.basicConsume(TASK_QUEUE_NAME, false, consumer);

    while (true) {
      QueueingConsumer.Delivery delivery = consumer.nextDelivery();
      String message = new String(delivery.getBody());

      System.out.println(" [x] Received '" + message + "'");   
      doWork(message); 
      System.out.println(" [x] Done" );

      channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    }
  }
  //...
}

更新:根据评论,协调员流程非常简单。它只是一个从 Phase1Queue 进行监控的程序,一旦该队列为空,它就会启动一个进程来填充 Phase2Queue 等。

【问题讨论】:

  • 我对 RabbitMQ 不是很熟悉,但这不是创建多个消费者,每个队列一个的问题吗?但目前尚不清楚如何将每台机器从阶段 1 移动到阶段 2(可能使用您的协调器)。为什么不使用单个队列并标记任务,以便接收者知道如何处理它(即任务用于哪个阶段)?
  • @DNA 谢谢!这实际上是一个好主意,我没有考虑过,但它可以工作。我的协调程序将需要跟踪它所处的阶段,但除此之外,在我的主程序中添加/删除阶段也很容易。我只是不确定是否可以使用 rabbitmq 发送多个参数,但我现在就开始寻找它。

标签: java messaging rabbitmq


【解决方案1】:

另一种方法是将不同的阶段分离到不同的模块/服务/流程中,并让它们同时运行。

每个阶段的消费者侦听不同的队列。每个阶段负责为下一个阶段的队列生成消息。

通过这种方式,您可以通过单一职责降低复杂性。每个进程从一个特定的队列消费并为另一个队列生产。然后,如果需要,您可以扩大和/或扩大这些单独的流程。

这是我们使用的一种模式。我们有一个初始工作,最终会创建多达 100 倍的子工作。初始作业非常小且运行迅速,而子作业可能是长时间运行的请求,我们使用少量云实例来提供服务。然后这些作业通过另一个队列返回它们的结果,然后将其结果整理并添加到数据库中。

【讨论】:

    猜你喜欢
    • 2020-11-07
    • 2015-02-22
    • 1970-01-01
    • 2018-10-27
    • 2020-07-18
    • 2021-06-27
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多