【问题标题】:Rabbitmq inNode.js with amqplib not broadcasting fanout messages带有 amqplib 的 Rabbitmq inNode.js 不广播扇出消息
【发布时间】:2021-07-10 04:40:05
【问题描述】:

这个脚本应该是rabbitmq学习意义的信使。我正在尝试运行带有扇出交换的消息代理。

问题是:没有广播消息。这些以负载平衡方式发送。如何像真正的扇出交换一样广播这些消息?

const amqplib = require('amqplib');
const readline = require("readline");

class Chat {

    async init() {
        await this.configureChannel();
        await this.configureConsumer();
        await this.configureCommandLine();
    }

    async configureChannel() {
        const conn = await amqplib.connect('amqp://guest:guest@localhost:5672');
        const ch = await conn.createChannel();
        await ch.assertExchange("chat", "fanout", {});
        const { queue } = await ch.assertQueue('messages');
        await ch.bindQueue(queue, 'chat', '');
        this.ch = ch;
    }

    async configureConsumer() {
        await this.ch.consume("messages", logMessage);
        function logMessage(msg) {
            if (msg.content)
                console.log("\n[*] Recieved message: '%s'", msg.content.toString())
        }
    }

    async configureCommandLine() {
        const commandLine = readline.createInterface({
            input: process.stdin,
            output: process.stdout
        });
        this.commandLine = commandLine;
    }

    async run() {
        const prompt = () => {
            this.commandLine.question("Message: ", async (mensagem) => {
                debugger;
                if (mensagem === "sair") {
                    return this.commandLine.close();
                }
                await this.ch.publish("chat", 'messages', Buffer.from(mensagem), {});
                prompt();
            });
        }
        await this.init();
        console.log("\nChat\n");
        prompt();
    }

}

const chat = new Chat();
chat.run();

【问题讨论】:

    标签: node.js rabbitmq node-modules


    【解决方案1】:

    假设您按原样运行此脚本的多个实例,这里的问题是您为所有消费者使用一个队列。 fanoutexchange 将它收到的每条消息发送到绑定到交换的所有队列。但是每个队列(如果它有多个消费者)将以循环方式工作(假设预取计数有一些限制)。要实现fanout 行为,您需要使用不同的队列运行脚本的每个实例。

    类似的东西(未测试):

        ...
        async configureChannel() {
            const conn = await amqplib.connect('amqp://guest:guest@localhost:5672');
            const ch = await conn.createChannel();
            await ch.assertExchange("chat", "fanout", {});
            this.ch = ch;
        }
    
        async configureConsumer() {
            const { ch } = this;
            const { queue } = await ch.assertQueue('', { exclusive: true });
            await ch.bindQueue(queue, 'chat', '');
            ch.consume(queue, logMessage);
            function logMessage(msg) {
                if (msg.content)
                    console.log("\n[*] Recieved message: '%s'", msg.content.toString())
            }
        }
        ...
    
    

    【讨论】:

    • 它只需要我在 ch.assertQueue(...) 的问题中进行更正。此方法是异步的,需要等待
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-09-07
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多