【问题标题】:amqplib sendToQueue() is not sending messages to RabbitMQamqplib sendToQueue() 没有向 RabbitMQ 发送消息
【发布时间】:2025-12-28 23:05:17
【问题描述】:

我正在围绕amqplib 的 Promise API 构建一个 Typescript 包,以便更简单地在两个 RabbitMQ 队列之间发送消息。

这是所述包的类中的一个方法,负责向 RabbitMQ 发送消息。它实际上并没有发送消息,因为我无法从 CloudAMQP 中看到它们。

class PostOffice {
    connection: any;

    constructor(connection: any) {
        this.connection = connection;
    }

    //...

    // TMQMessage is an object that can be trivially serialized to JSON
    // The Writer class holds the channel because I'm having trouble
    // making Typescript class member updates stick after a method returns.
    async send(writer: Writer, message: TMQMessage) {
        //...RabbitMQ code begins here
        let channel = writer.stageConsume[0]; // This channel is created elsewhere
        
        // Queue is created here and I can see it in CloudAMQP.
        await channel.assertQueue(message.to + "/stage/1", {
            durable: true,
        }); // `message.to` is a string indicating destination, e.g. 'test/hello'
        
        // According to docs, the content should be converted to Buffer
        await channel.sendToQueue(message.to + "/stage/1", Buffer.from(JSON.stringify(message)));
        
        channel.close()
    }

}

这是我创建的一个创建连接和通道的类,在这里使用:

import { Writer } from "./Writer";

export class ConnectionFactory {
    url: string;
    amqp: any;
    constructor(url: string) {
        this.url = url;
        this.amqp = require('amqplib');
    }

    async connect(timeout: number = 2000) {
        let connection = await this.amqp.connect(this.url, {
            // timeout for a message acknowledge.
            timeout, // Make the timeout 2s per now
        })
        return connection;
    }
    async createChannels(connection: any, writer: Writer) {
        //... relevant code starts here
        let stageChannels = [];
        stageChannels.push(await connection.createChannel())
        writer.stageConsume = stageChannels;
        return writer;
    }
}

}

我的连接工厂似乎工作正常,因为我可以看到来自 CloudAMQP 仪表板的连接。我也可以从仪表板看到已创建(在我的代码中声明)的队列。但是,我无法让 amqplib 向 CloudAMQP 发送消息。

这是我用来调用我的包的(异步)代码:

let Cf = new ConnectionFactory(url)
let connection = await Cf.connect()
let writer = new Writer();

let message: TMQMessage = {
    //... message content in JSON
}

writer = await Cf.createChannels(connection, writer)
let po = new PostOffice(connection)
await po.send(writer, message)

似乎出了什么问题?

【问题讨论】:

    标签: node.js rabbitmq cloudamqp node-amqplib


    【解决方案1】:

    可能也想等待 close 方法。但总的来说,我会推荐 amqp-client.js(也有 TypeScript 定义),https://github.com/cloudamqp/amqp-client.js/

    【讨论】:

    • 看起来很简单。我将继续迁移我的包以使用它,然后看看情况是否更好。
    • Typescript 只允许我 import AMQPClient from '@cloudamqp/amqp-client',如果我尝试使用花括号导入任何其他类,我会得到:This module can only be referenced with ECMAScript imports/exports by turning on the 'esModuleInterop' flag and referencing its default export.ts(2497)。我不确定如何导入AMQPQueue 以及其他一些类。
    • 您不应该手动导入这些类吗?或者你想做什么?
    • 你从 AMQPClient::Channel#queue 获得了一个 AMQPQueue 实例,你永远不必手动实例化一个 AMQPQueue。
    • 我花了几天时间将我的 API 调整为新模块,但它得到了回报,因为我可以看到从 RabbitMQ 管理控制台发送和接收的消息。谢谢!