【发布时间】:2012-03-04 15:41:00
【问题描述】:
更新:我认为我的解释可能过于冗长,所以我想一种简单的思考方式是:我有多个工作节点,我想将它们用于不同的任务(以及那些不同的任务来自不同的队列)。目前我只知道如何让他们收听与单一类型的工作相关联的单个队列,但我希望他们收听不同的队列,以便当不同的工作出现时,同一个节点集群可以处理它们。希望那更清楚。
大家好, 我怀疑这是可能的,但我似乎无法弄清楚该怎么做。我浏览了 rabbitmq 网站上的教程,它们真的很有帮助,并且做了我想做的事,只是它没有显示如何在同一个程序中收听多个队列。
我的程序结构基本上是几个阶段。例如,阶段 1 收集大量数据,然后阶段 2 对其进行处理并将其加载到数据库中,阶段 3 将其与其他数据进行分析,等等。每个阶段都无法启动直到前一个阶段完成,我想使用队列系统来使用多台机器更快地完成每个阶段(所以所有消费者都在阶段 1 工作,然后一旦所有消费者都完成,他们就会一起在阶段 2 工作,等等)。
我认为我不能只执行每个阶段一次,因为队列可能为空,计算机将移至下一个队列,我无法知道它是否为空,因为所有工作都已完成,或者它是否已完成,因为我们还没有开始排队。所以我想(如果我错了,请纠正我)一个更好的方法是监听与所有阶段相关的所有队列,并且随着工作进入 phase1Queue 它可以工作,如果工作进入 phase2Queue 它会在之后立即工作(我在我描述的过程之外还有另一个过程,它监视每个阶段何时完成并为下一阶段设置)。希望这是有道理的。
队列示例中的代码对于收听一个队列的消费者很有帮助,但我怎样才能让它收听多个(并根据不同的队列调用不同的程序)。如果已经有一个功能,那就太棒了,但我正在寻找我可以用来在java中实现它的逻辑(最坏的情况我曾想过运行5个单独的程序来监听每个队列,但我试图找出如果有更好的方法,拥有一个包含我所有工作的应用程序会更容易管理分发)。
谢谢!
附言如果它有帮助,这里是适用于rabbitmq 的消费者代码(但你可以看到它只定义了一个队列):
import java.io.IOException;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;
public class Worker {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] argv)
throws java.io.IOException,
java.lang.InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
channel.basicQos(1);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(TASK_QUEUE_NAME, false, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received '" + message + "'");
doWork(message);
System.out.println(" [x] Done" );
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
//...
}
更新:根据评论,协调员流程非常简单。它只是一个从 Phase1Queue 进行监控的程序,一旦该队列为空,它就会启动一个进程来填充 Phase2Queue 等。
【问题讨论】:
-
我对 RabbitMQ 不是很熟悉,但这不是创建多个消费者,每个队列一个的问题吗?但目前尚不清楚如何将每台机器从阶段 1 移动到阶段 2(可能使用您的协调器)。为什么不使用单个队列并标记任务,以便接收者知道如何处理它(即任务用于哪个阶段)?
-
@DNA 谢谢!这实际上是一个好主意,我没有考虑过,但它可以工作。我的协调程序将需要跟踪它所处的阶段,但除此之外,在我的主程序中添加/删除阶段也很容易。我只是不确定是否可以使用 rabbitmq 发送多个参数,但我现在就开始寻找它。