【问题标题】:Message queue architecture (client to web server to worker and back)消息队列架构(客户端到 Web 服务器到工作人员并返回)
【发布时间】:2016-07-03 01:40:44
【问题描述】:

我有一个在 Heroku 上运行的用 Node JS 编写的 Web 服务器。服务器有一个 Web 服务器进程和一个工作进程。 Web 服务器通过 RabbitMQ 队列成功向 worker 发送消息; Worker 成功将处理后的数据返回给 Web 服务器。我使用随机生成的 Uuid 来跟踪消息并确保正确的消息与正确的原始消息配对。

在一个单独的项目中,我让客户端(网站)与 Web 服务器成功通信。现在,我需要将两者放在一起。

我怎样才能做到这一点:

  1. 客户端发送一个 HTTP POST。
  2. Web 服务器接收请求并将请求传递到消息队列中。
  3. Worker 处理请求并返回到 Web 服务器。
  4. Web 服务器向客户端返回正确的数据,以便客户端可以将响应与原始请求相关联。

第 4 步是我卡住的地方。我想我在某处读到客户端应该不断地轮询(HTTP POST?)Web 服务器,直到它的数据准备好。我想我需要在第 1 步之后回复客户端,这样请求就不会超时。任何想法/建议表示赞赏!

框图:

【问题讨论】:

  • 这将是一个需要状态更新的长期运行过程吗?或者这将是来自后端的非常快速的响应,原始 HTTP 请求应该从该后端服务获得带有数据的响应?
  • 我认为状态更新。有时后端会超过 30 秒。
  • 是的,对于超过一秒左右的时间,状态更新是可行的方法。我会尽快发布答案。
  • 不能使用WebSocket吗?只要客户留在您的网站上,它应该是可能的。

标签: architecture rabbitmq httprequest backgroundworker message-queue


【解决方案1】:

您需要做的简短版本是双向消息传递。您的 Web 应用程序需要是消息生产者和消息消费者。您的后端服务也是如此。

当 HTTP 请求进来时,Web 服务器通过 RabbitMQ 发送消息。后端会在未来的某个时间点接收它。同时,Web 服务器通过 HTTP 请求发回响应,说明正在发生某些事情,稍后将通知用户。

如果您使用的是 express,它看起来像这样:


var router = express.Router();
router.post("/", postJob);

function postJob(req, res, next){
  req.session.inProgress = true;

  var msg = {
    job: "do some work"
  };

  jobSender.sendJobRequest(msg, function(err){
    if (err) { return next(err); }
    res.render("some-response");
  });
}

这段代码做了很多假设,比如jobSender 是某种封装对象,具有通过 RabbitMQ 发送消息的方法。根据您已经说过的,我相信您可以填写发送消息的详细信息。

重要的是,HTTP 请求处理程序通过 RabbitMQ 发送消息,然后将 HTTP 响应发送回 Web 浏览器。

此时,浏览器可以做它需要做的任何事情。

在后端,当其他服务完成它的工作时,它需要做以下两件事之一:

1) 在某处更新共享数据库,以便您的网络服务器知道已完成的工作(并且可以读取状态)

2) 通过 rabbitmq 将消息发送回 Web 服务器

由于各种原因,选项#1 可能并不总是一个好的选择。从你的问题来看,无论如何你都想要选项#2。

您将需要第二个队列 - Web 服务器正在侦听的队列。当 Web 服务器从该队列接收到消息时,它将使用接收到的状态更新它自己的数据库。

此状态可能是“完成”或“进行中”或“错误”或您认为合适的其他状态。

例如,如果您有一个“工作状态”消息,您可能有一个名为“JobStatusReceiver”的抽象来接收状态消息。

像这样的简单模块可以从您的作业状态队列接收消息,并使用状态更新本地数据库


var JobStatusReceiver = require("./jobStatusReceiver");
var someDataObject = require("someDataObject");

var jobStatus = {

  listen: function(){
    var receiver = new JobStatusReceiver();
    receiver.receive(function(statusMessage){

      someDataObject.status = statusMessage.status;
      someDataObject.data = statusMessage.data;


      someDataObject.save();
    });
  }

};

module.exports = jobStatus;

请注意,这可能发生在 Web 服务器中,但它不是 HTTP 请求的一部分。消息通过带有 JobStatusReceiver 的 RabbitMQ 传入,而不是 HTTP 请求的一部分。

someDataObject 对象很可能是来自 yoru 数据库的对象,因此可以将其保存回数据库。

最后,您需要通过数据通知用户已完成操作的部分可以通过多种方式进行。

一般来说,每隔几秒对 Web 服务器上的 HTTP API 进行一次 AJAX 调用以查找有效响应是相当容易的。

在浏览器端,这可能很简单:


var timer = setInterval(function(){

  $.ajax({
    url: "/api/check-status",
    success: function(data){
      if (data.complete){
        clearInterval(timer);
        doSomeOtherWork(data);
      }
    })
  });

});

再次在 Express 应用程序中,处理“/api/check-status”,您将使用相同的“someDataObject”模型来检查状态:


var someDataObject = require("someDataObject");

var router = new express.Router();
router.get("/", checkStatus);

function checkStatus(req, res, next){

  someDataObject.load(function(err, someData){
    if (err) { return next(err); }

    res.json({
      complete: (someData.status === "complete"),
      data: someData.data
    });

  });

}

希望这能让您走上正确的道路。当然,我遗漏了很多细节,但希望您能够填补缺失的部分。

...

P.S.:我在RabbitMQ 4 Devs 培训课程中涵盖了所有这些,除了浏览器在计时器上检查状态更新。这是一个使用 RabbitMQ 和 Node.js 启动和运行的完整包

【讨论】:

  • 这是一个不错的方法,我想它可以与 pub/sub 模式混合而不是池化。
【解决方案2】:

看看这个article虽然它使用activeMQ而不是rabbitMQ,但它在这里说你为每个到达请求设置了一个相关id,你可以使用它来将响应映射到一个对应的id,这样你就可以轻松地回复向您的网络客户端发送正确请求的正确响应。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2012-12-05
    • 1970-01-01
    • 1970-01-01
    • 2013-05-18
    • 2021-01-26
    • 1970-01-01
    • 1970-01-01
    • 2021-07-19
    相关资源
    最近更新 更多