【问题标题】:RabbitMQ ImplementationRabbitMQ 实现
【发布时间】:2013-01-13 17:05:10
【问题描述】:

这是我目前的设置。

  • REST API 将 (POST) 数据推送到队列中
  • 队列有一个始终在运行的消费者并生产到 en Exchange
  • Exchange 路由到其他几个队列(比如 20+)
  • (20 多个)队列中的每一个都执行特定的任务(消费者也始终运行)
  • Cron 作业运行以检查所有 (20+) 任务是否已完成并生成到另一个队列

我不确定我是否喜欢 Consumers 一直运行,因为每个 Consumers 使用大约 300MB 的 Ram(我认为它是 MB,目前不在我面前)并且我正在寻找另一种实现方式。

    M <-- Message coming from REST API
    |
    |
    +-First Queue
    |
    |
    | <-- The Exchange
   /|\
  / | \
 /  |  \ <-- bind to multiple queues ( 20+ )
Q1  Q2  Q3 <-- Each Queue is a task that must be completed


    | <-- CRON runs to check if all queues above have completed
    |
    |
    Q4 <-- Queues 1,2 and 3 must finish first before Queue 4 can start
    |
    C <-- Consumer 

我在下面的相关问题中建议使用 RPC,但这个问题是 RPC(据我了解)将有多个实例。这是一个资源密集型过程,我认为通过添加 RPC 调用它只会让服务器陷入困境,然后变得无响应(如果我错了,请纠正我)。

另一种方法是使用聚合器模式

这看起来正是我需要的,但我发现文档有限。有人做过这种模式吗?

我的问题是我对它目前的实施方式不满意,我正在寻找改进流程的方法。我希望摆脱 CRON,实施新模式,而不是让消费者一直运行。

该流程目前也只支持每个消费者的单个实例。它可以有多个消费者,但我们如何实现它,我们当时只想要一个。

这是在 PHP、Symfony2 框架中使用 RabbitMQBundle 实现的

相关问题:

【问题讨论】:

  • 如果您投票结束,请给我一些改进问题的建议
  • 我没有投票结束,因为当涉及到像这样显然很有趣且经过深思熟虑的问题时,我对整个“不具建设性”的事情感到矛盾,但是这个问题真的很开放。 “寻找改进流程的方法”类似于邀请对该主题的讨论,而不是“一个可验证的最佳答案”。就像常见问题解答中所说的那样,“如果你可以想象一本书可以回答你的问题,那你就问得太多了。”我当然可以想象出一本关于这个主题的书。
  • 我同意,这里似乎缺乏直接的问题。

标签: php design-patterns rabbitmq message-queue amqp


【解决方案1】:

这里是 OldSound,RabbitMQ Bundle 的创建者。

捆绑包本身不支持开箱即用的聚合器模式,但您可以使用底层 php-amqplib 实现它。

要执行聚合,您需要沿处理链发布具有相关 ID 和该 ID 的线程的消息。然后聚合器将等待 X 条消息,具体取决于您必须处理该特定任务的不同工作人员的数量。等待消息的一种方法是拥有一个数组,如果它们被相关 id 索引,则保留它们。

因此,每当您收到传入消息时,您都会这样做:

$correlation_id = $msg->get('correlation_id');
$this->receivedMessages[$correlation_id]['msgs'][] = $msg;

然后你做的某处:

if ($someNumber == count($this->receivedMessages[$correlation_id]['msgs']) {
// proceed to next step
}

我现在实际上正在为 Symfony 开发一个工作流捆绑包,我计划很快将其开源。该捆绑包可用于以非常简单的方式实现您提出的用例(即您只需为每个任务提供服务)。

现在我想知道为什么每个消费者要占用 300 MB 的 RAM?您是否需要与它们一起运行完整的堆栈框架?如果可能的话,为消费者应用程序创建一个新的 Symfony 内核,并且只加载你需要的内容以减少开销。

【讨论】:

  • 是的,我需要完整的堆栈框架,因为现在我还将添加一个 GUI 前端。至于“工作流程”捆绑包,有什么 ETA 吗?听起来很有趣
  • 只是一个更新:看起来 Doctrine 正在缓存所有查询,在 EM 上运行 clear() 看起来会有所帮助,但它仍然不是 100%,仍在寻求优化 stackoverflow.com/questions/15572384/doctrine-orm-memory-issues
猜你喜欢
  • 1970-01-01
  • 2016-10-04
  • 1970-01-01
  • 2019-08-20
  • 1970-01-01
  • 2011-12-14
  • 2015-01-06
  • 2015-07-06
  • 1970-01-01
相关资源
最近更新 更多