我认为您在初步理解方面有几个问题。坦率地说,看到以下内容我有点惊讶:both need 5 threads to handle the volume。你是如何确定你需要那个确切的数字的?你有任何保证5个线程就足够了吗?
RabbitMQ 已经过调整和时间测试,所以一切都与正确的设计有关
和高效的消息处理。
让我们尝试检查问题并找到合适的解决方案。顺便说一句,消息队列本身并不能保证您有真正好的解决方案。您必须了解自己在做什么,还要进行一些额外的测试。
您肯定知道有许多可能的布局:
我将使用布局B 作为最简单的方式来说明1 生产者N 消费者问题。由于您非常担心吞吐量。顺便说一句,正如您可能期望的那样,RabbitMQ 的表现非常好 (source)。关注prefetchCount,我稍后会解决:
因此,消息处理逻辑很可能是确保您有足够吞吐量的正确位置。当然,每次需要处理消息时,您都可以跨越一个新线程,但最终这种方法会杀死您的系统。基本上,线程越多,延迟越大(如果需要,可以查看Amdahl's law)。
(见Amdahl’s law illustrated)
提示 #1:小心使用线程,使用 ThreadPools (details)
线程池可以描述为 Runnable 对象的集合
(工作队列)和正在运行的线程的连接。这些线程是
不断运行并正在检查新工作的工作查询。如果
他们执行这个 Runnable 有新的工作要做。线程
类本身提供了一个方法,例如execute(Runnable r) 添加一个新的
可运行对象到工作队列。
public class Main {
private static final int NTHREDS = 10;
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(NTHREDS);
for (int i = 0; i < 500; i++) {
Runnable worker = new MyRunnable(10000000L + i);
executor.execute(worker);
}
// This will make the executor accept no new threads
// and finish all existing threads in the queue
executor.shutdown();
// Wait until all threads are finish
executor.awaitTermination();
System.out.println("Finished all threads");
}
}
提示 #2:注意消息处理开销
我会说这是明显的优化技术。您可能会发送小型且易于处理的消息。整个方法是关于要连续设置和处理的较小消息。大消息最终会成为一个糟糕的笑话,所以最好避免这种情况。
所以最好发送微小的信息,但是处理呢?每次提交作业时都会产生开销。在传入消息率较高的情况下,批处理非常有用。
例如,假设我们有简单的消息处理逻辑,并且我们不希望每次处理消息时都有特定于线程的开销。为了优化那个很简单的CompositeRunnable can be introduced:
class CompositeRunnable implements Runnable {
protected Queue<Runnable> queue = new LinkedList<>();
public void add(Runnable a) {
queue.add(a);
}
@Override
public void run() {
for(Runnable r: queue) {
r.run();
}
}
}
或者通过收集要处理的消息以稍微不同的方式做同样的事情:
class CompositeMessageWorker<T> implements Runnable {
protected Queue<T> queue = new LinkedList<>();
public void add(T message) {
queue.add(message);
}
@Override
public void run() {
for(T message: queue) {
// process a message
}
}
}
通过这种方式,您可以更有效地处理消息。
提示 #3:优化消息处理
尽管您知道可以并行处理消息 (Tip #1) 并减少处理开销 (Tip #2),但您必须快速完成所有操作。冗余处理步骤、繁重的循环等可能会对性能产生很大影响。请看有趣的案例研究:
Improving Message Queue Throughput tenfold by choosing the right XML Parser
提示 #4:连接和渠道管理
- 在现有连接上启动新通道涉及一个网络
往返 - 开始一个新的连接需要几个。
- 每个连接都使用服务器上的文件描述符。频道没有。
- 在一个通道上发布大消息将阻止连接
当它熄灭时。除此之外,多路复用相当透明。
- 如果服务器被阻止,正在发布的连接可能会被阻止
重载 - 将发布和消费分开是个好主意
连接
- 准备好处理消息突发
(source)
请注意,所有提示都可以完美配合。如果您需要更多详细信息,请随时告诉我。
完整的消费者示例 (source)
请注意以下几点:
-
channel.basicQos(prefetch) - 正如您之前看到的
prefetchCount 可能非常有用:
这个命令允许消费者选择一个预取窗口,
指定它准备接收的未确认消息的数量
收到。通过将预取计数设置为非零值,代理
不会向消费者传递任何违反该协议的消息
限制。要将窗口向前移动,消费者必须确认
收到一条消息(或一组消息)。
-
ExecutorService threadExecutor - 您可以指定正确配置的执行器服务。
例子:
static class Worker extends DefaultConsumer {
String name;
Channel channel;
String queue;
int processed;
ExecutorService executorService;
public Worker(int prefetch, ExecutorService threadExecutor,
, Channel c, String q) throws Exception {
super(c);
channel = c;
queue = q;
channel.basicQos(prefetch);
channel.basicConsume(queue, false, this);
executorService = threadExecutor;
}
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
Runnable task = new VariableLengthTask(this,
envelope.getDeliveryTag(),
channel);
executorService.submit(task);
}
}
您还可以检查以下内容: