【问题标题】:Nodejs publisher and subscriber in rabbitmqrabbitmq 中的 Nodejs 发布者和订阅者
【发布时间】:2022-12-11 16:53:39
【问题描述】:

大家好,我遇到了一个问题,我将从解释我在做什么开始,

我创建了 2 个 nodejs 服务器,一个是发布者,另一个是订阅者,两个服务器都将执行加法和乘法等数学任务

所以我创建了一个用于加法和乘法的 rest api 如果我启动发布者和订阅者服务器并点击加法/乘法端点,我也会得到想要的结果我在订阅者端得到了想要的结果

但是我不知道如何创建 2 个发布者副本和 3 个子工作者/订阅者副本,其中 pub1 和 pub2 状态将不同并且完成后工作人员现在必须将结果通知发布者

它还应该将所有过去的任务及其结果或未决状态告诉发布者,发布服务器应该将任务列表保存在本地临时文件中

我们也可以使用 docker 创建多个副本,但不知道该怎么做?

我也与大家分享我的代码

任何有关示例的帮助将不胜感激

提前致谢!

Publisher.js 文件

const express = require("express");
const amqp = require("amqplib");
const app = express();
const bodyParser = require("body-parser");
const PORT = process.env.PORT || 3000;
let channel, connection;
app.use(express.json());



app.get("/math-task/sum", (req, res) => {
  let inputOfA = parseInt(req.body.a);
  let inputOfB = parseInt(req.body.b);
  let sum = Number(inputOfA + inputOfB);
  sendData(sum); // pass the data to the function we defined
  console.log("A message is sent to queue");
  res.send("Message Sent For Addition:" + Number(sum)); //response to the API request
});

app.get("/math-task/mul", (req, res) => {
  let inputOfA = parseInt(req.body.a);
  let inputOfB = parseInt(req.body.b);
  let product = Number(inputOfA * inputOfB);
  sendData(product); // pass the data to the function we defined
  console.log("A message is sent to queue");
  res.send("Message Sent For Multiplication:" + Number(product)); //response to the API request
});

app.use(bodyParser.urlencoded({extended:false}));
app.use(bodyParser.json()); 

app.listen(PORT, () => console.log("Server running at port " + PORT));

async function connectQueue() {
  try {
    connection = await amqp.connect("amqp://localhost:5672");
    channel = await connection.createChannel();

    await channel.assertQueue("test-queue");
  } catch (error) {
    console.log(error);
  }
}

async function sendData(data) {
  // send data to queue
  await channel.sendToQueue("test-queue", Buffer.from(JSON.stringify(data)));

  // close the channel and connection
  await channel.close();
  await connection.close();
}

connectQueue();

Subscriber.js 文件

const express = require("express");
const app = express();
const PORT = process.env.PORT || 3001;
app.use(express.json());
app.listen(PORT, () => console.log("Server running at port " + PORT));

const amqp = require("amqplib");
var channel, connection;
connectQueue()  // call the connect function
 
async function connectQueue() {
    try {
        connection = await amqp.connect("amqp://localhost:5672");
        channel    = await connection.createChannel()
        
        await channel.assertQueue("test-queue")
        
        channel.consume("test-queue", data => {
            console.log(`${Buffer.from(data.content)}`);
            channel.ack(data);
        })
    } catch (error) {
        console.log(error);
    }
}

【问题讨论】:

    标签: javascript node.js docker rabbitmq


    【解决方案1】:

    您应该将 RPC 用于此类任务,这可以通过 RabbitMQ 中的回调队列轻松完成。您还可以在此处找到示例实现:RabbitMQ RPC tutorial

    基本上你想要做的是修改 sendData 以监听 replyTo 队列,如果这里的消息是你最初请求的响应,那么它可以用响应解析。在示例中,这是通过correlationId 跟踪的。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2016-12-10
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2016-12-14
      • 1970-01-01
      相关资源
      最近更新 更多