【问题标题】:RabbitMQ by Example: Multiple Threads, Channels and QueuesRabbitMQ 示例:多线程、通道和队列
【发布时间】:2013-09-03 01:47:55
【问题描述】:

我刚刚阅读了RabbitMQ's Java API docs,发现它内容丰富且直截了当。如何设置一个简单的Channel 用于发布/消费的示例非常易于理解和理解。但这是一个非常简单/基本的示例,它给我留下了一个重要的问题:我如何设置 1+ Channels 来发布/使用多个队列?

假设我有一个带有 3 个队列的 RabbitMQ 服务器:loggingsecurity_eventscustomer_orders。因此,我们要么需要一个 Channel 才能发布/使用所有 3 个队列,要么更可能需要 3 个单独的 Channels,每个 Channels 专用于一个队列。

除此之外,RabbitMQ 的最佳实践要求我们为每个消费者线程设置 1 个Channel。对于此示例,假设 security_events 只需 1 个使用者线程即可,但 loggingcustomer_order 都需要 5 个线程来处理卷。所以,如果我理解正确,这是否意味着我们需要:

  • 1 个Channel 和1 个消费者线程,用于在security_events 之间发布/消费;和
  • 5 个Channels 和5 个消费者线程用于发布/消费到logging 和从logging;和
  • 5 个 Channels 和 5 个消费者线程,用于在 customer_orders 之间发布/消费?

如果我的理解有误,请先纠正我。无论哪种方式,一些厌战的 RabbitMQ 资深人士能否帮助我用一个体面的代码示例“连接点”,以便在此处设置满足我的要求的发布者/消费者?提前致谢!

【问题讨论】:

    标签: java multithreading rabbitmq messaging channel


    【解决方案1】:

    我认为您在初步理解方面有几个问题。坦率地说,看到以下内容我有点惊讶:both need 5 threads to handle the volume。你是如何确定你需要那个确切的数字的?你有任何保证5个线程就足够了吗?

    RabbitMQ 已经过调整和时间测试,所以一切都与正确的设计有关 和高效的消息处理。

    让我们尝试检查问题并找到合适的解决方案。顺便说一句,消息队列本身并不能保证您有真正好的解决方案。您必须了解自己在做什么,还要进行一些额外的测试。

    您肯定知道有许多可能的布局:

    我将使用布局B 作为最简单的方式来说明1 生产者N 消费者问题。由于您非常担心吞吐量。顺便说一句,正如您可能期望的那样,RabbitMQ 的表现非常好 (source)。关注prefetchCount,我稍后会解决:

    因此,消息处理逻辑很可能是确保您有足够吞吐量的正确位置。当然,每次需要处理消息时,您都可以跨越一个新线程,但最终这种方法会杀死您的系统。基本上,线程越多,延迟越大(如果需要,可以查看Amdahl's law)。

    (见Amdahl’s law illustrated

    提示 #1:小心使用线程,使用 ThreadPools (details)

    线程池可以描述为 Runnable 对象的集合 (工作队列)和正在运行的线程的连接。这些线程是 不断运行并正在检查新工作的工作查询。如果 他们执行这个 Runnable 有新的工作要做。线程 类本身提供了一个方法,例如execute(Runnable r) 添加一个新的 可运行对象到工作队列。

    public class Main {
      private static final int NTHREDS = 10;
    
      public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(NTHREDS);
        for (int i = 0; i < 500; i++) {
          Runnable worker = new MyRunnable(10000000L + i);
          executor.execute(worker);
        }
        // This will make the executor accept no new threads
        // and finish all existing threads in the queue
        executor.shutdown();
        // Wait until all threads are finish
        executor.awaitTermination();
        System.out.println("Finished all threads");
      }
    } 
    

    提示 #2:注意消息处理开销

    我会说这是明显的优化技术。您可能会发送小型且易于处理的消息。整个方法是关于要连续设置和处理的较小消息。大消息最终会成为一个糟糕的笑话,所以最好避免这种情况。

    所以最好发送微小的信息,但是处理呢?每次提交作业时都会产生开销。在传入消息率较高的情况下,批处理非常有用。

    例如,假设我们有简单的消息处理逻辑,并且我们不希望每次处理消息时都有特定于线程的开销。为了优化那个很简单的CompositeRunnable can be introduced

    class CompositeRunnable implements Runnable {
    
        protected Queue<Runnable> queue = new LinkedList<>();
    
        public void add(Runnable a) {
            queue.add(a);
        }
    
        @Override
        public void run() {
            for(Runnable r: queue) {
                r.run();
            }
        }
    }
    

    或者通过收集要处理的消息以稍微不同的方式做同样的事情:

    class CompositeMessageWorker<T> implements Runnable {
    
        protected Queue<T> queue = new LinkedList<>();
    
        public void add(T message) {
            queue.add(message);
        }
    
        @Override
        public void run() {
            for(T message: queue) {
                // process a message
            }
        }
    }
    

    通过这种方式,您可以更有效地处理消息。

    提示 #3:优化消息处理

    尽管您知道可以并行处理消息 (Tip #1) 并减少处理开销 (Tip #2),但您必须快速完成所有操作。冗余处理步骤、繁重的循环等可能会对性能产生很大影响。请看有趣的案例研究:

    Improving Message Queue Throughput tenfold by choosing the right XML Parser

    提示 #4:连接和渠道管理

    • 在现有连接上启动新通道涉及一个网络 往返 - 开始一个新的连接需要几个。
    • 每个连接都使用服务器上的文件描述符。频道没有。
    • 在一个通道上发布大消息将阻止连接 当它熄灭时。除此之外,多路复用相当透明。
    • 如果服务器被阻止,正在发布的连接可能会被阻止 重载 - 将发布和消费分开是个好主意 连接
    • 准备好处理消息突发

    (source)

    请注意,所有提示都可以完美配合。如果您需要更多详细信息,请随时告诉我。

    完整的消费者示例 (source)

    请注意以下几点:

    • channel.basicQos(prefetch) - 正如您之前看到的 prefetchCount 可能非常有用:

      这个命令允许消费者选择一个预取窗口, 指定它准备接收的未确认消息的数量 收到。通过将预取计数设置为非零值,代理 不会向消费者传递任何违反该协议的消息 限制。要将窗口向前移动,消费者必须确认 收到一条消息(或一组消息)。

    • ExecutorService threadExecutor - 您可以指定正确配置的执行器服务。

    例子:

    static class Worker extends DefaultConsumer {
    
        String name;
        Channel channel;
        String queue;
        int processed;
        ExecutorService executorService;
    
        public Worker(int prefetch, ExecutorService threadExecutor,
                      , Channel c, String q) throws Exception {
            super(c);
            channel = c;
            queue = q;
            channel.basicQos(prefetch);
            channel.basicConsume(queue, false, this);
            executorService = threadExecutor;
        }
    
        @Override
        public void handleDelivery(String consumerTag,
                                   Envelope envelope,
                                   AMQP.BasicProperties properties,
                                   byte[] body) throws IOException {
            Runnable task = new VariableLengthTask(this,
                                                   envelope.getDeliveryTag(),
                                                   channel);
            executorService.submit(task);
        }
    }
    

    您还可以检查以下内容:

    【讨论】:

    • 如果我设置 auto ACK =false,在 executor 线程池中 ack 是否安全?
    • 使用这种方法,消费者将破坏 MQ 队列并将任务放置到executorService 队列。当传入消息流大于execturorService 消息处理速度时,可能会出现问题。
    【解决方案2】:

    如何设置 1+ 个频道以在多个队列中发布/消费?

    您可以使用线程和通道来实现。您所需要的只是一种方法 对事物进行分类,即登录中的所有队列项,所有 来自 security_events 等的队列元素。分类可以是 使用 routingKey 实现。

    ie:每次将项目添加到队列时,您都指定路由 钥匙。它将作为属性元素附加。通过这个你可以得到 来自特定事件的值说 logging

    以下代码示例说明了如何在客户端完成。

    例如:

    路由键用于识别通道的类型并检索类型。

    例如,如果您需要获取有关登录类型的所有频道 那么您必须将路由键指定为登录名或其他关键字 来识别它。

                Connection connection = factory.newConnection();
                Channel channel = connection.createChannel();
    
                channel.exchangeDeclare(EXCHANGE_NAME, "direct");
    
                string routingKey="login";
    
                channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
    

    您可以查看here 了解有关分类的更多详细信息..


    线程部分

    发布部分结束后,您可以运行线程部分..

    在这部分中,您可以根据类别获取已发布的数据。 IE;路由键,在您的情况下是日志记录、security_events 和 customer_orders 等。

    查看示例以了解如何在线程中检索数据。

    例如:

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
    //**The threads part is as follows** 
     channel.exchangeDeclare(EXCHANGE_NAME, "direct");      
     String queueName = channel.queueDeclare().getQueue();
        // This part will biend the queue with the severity (login for eg:)
        for(String severity : argv){
                  channel.queueBind(queueName, EXCHANGE_NAME, routingKey);
        }
        boolean autoAck = false;
        channel.basicConsume(queueName, autoAck, "myConsumerTag",
        new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag,
                                    Envelope envelope,
                                    AMQP.BasicProperties properties,
                                    byte[] body)
             throws IOException
         {
                 String routingKey = envelope.getRoutingKey();
                 String contentType = properties.contentType;
                 long deliveryTag = envelope.getDeliveryTag();
    
                 // (process the message components here ...)
                 channel.basicAck(deliveryTag, false);
         }
     });
    

    现在一个线程处理队列中的数据 类型登录(路由密钥)被创建。通过这种方式,您可以创建多个线程。 每个都有不同的用途。

    查看here 了解有关线程部分的更多详细信息..

    【讨论】:

    • 谢谢。我更喜欢声明 n channels 并将队列绑定到它们中的每一个以指定并发级别,这反过来又消除了我这边管理线程的麻烦。