【问题标题】:Programatically declare RabbitMQ consumers in NestJS / Node.js?在 NestJS / Node.js 中以编程方式声明 RabbitMQ 消费者?
【发布时间】:2023-11-05 22:01:02
【问题描述】:

我正在使用 NestJS 应用程序来使用 RabbitMQ 队列。 无论顺序如何,每条消息都可以被处理,所以我想知道为同一个队列声明新消费者的最佳做法是什么。

预期行为:队列由该服务处理,该服务使用多个消费者

队列:[1,2,3,4,5,6, ...N];

在nestJS 中,您可以使用@RabbitSubscribe 装饰器分配一个函数来处理数据。我想做的事情可以通过简单地用装饰器复制(和重命名)函数来实现,所以这个函数也会被调用来处理队列中的数据

  @RabbitSubscribe({
    ...
    queue: 'my-queue',
  })
  async firstSubscriber(data){
  // 1, 3, 5...
  }


 @RabbitSubscribe({
    ...
    queue: 'my-queue',
  })
  async secondSubscriber(data){
  // 2, 4, 6...
  }

我知道我可以复制项目并水平扩展,但我更愿意在同一流程中执行此操作。

我如何声明订阅者以编程方式获得相同的行为,以便我可以通过更多并发处理来处理数据?

【问题讨论】:

    标签: node.js rabbitmq nestjs amqp


    【解决方案1】:

    如果您使用 @golevelup/nestjs-rabbitmq 包,您将受益,因为它支持不同的消息队列消费,如果您的应用程序是混合的,则支持更多。 先安装

    npm i @golevelup/nestjs-rabbitmq
    

    那么你的 Nestjs 应用结构应该是这样的

    src --
         |
         app.module.ts
         main.ts
         app.module.ts
         someHttpModule1 --
                          |
                          someHttpModule1.controller.ts
                          someHttpModule1.module.ts
                          someHttpModule1.service.ts
                          ...
         someHttpModule2 --
                          |
                          someHttpModule2.controller.ts
                          someHttpModule2.module.ts
                          someHttpModule2.service.ts
                          ...
         ...
         // Events module is designed for consuming messages from rabbitmq
         events --
                 |
                 events.module.ts
                 someEventConsumerModule1 --
                                           |
                                           someEventConsumerModule1.module.ts
                                           someEventConsumerModule1.service.ts
                 someEventConsumerModule2 --
                                           |
                                           someEventConsumerModule2.module.ts
                                           someEventConsumerModule2.service.ts
                 ...
    

    在 src/app.module.ts 文件中

    // module imports
    import { SomeHttpModule1 } from './someHttpModule1/someHttpModule1.module'
    import { SomeHttpModule2 } from './someHttpModule2/someHttpModule.module'
    import { EventsModule } from './events/events.module'
    // and other necessery modules
    
    @Module(
      imports: [
        SomeHttpModule1, 
        SomeHttpModule2,
        EventsModule, 
        // and other dependent modules
      ],
      controller: [],
      providers: []
    })
    
    export class AppModule{}
    

    在你的 events.module.ts 文件中

    // imports
    import { RabbitMQModule } from '@golevelup/nestjs-rabbitmq'
    import { SomeEventConsumerModule1 } from './someEventConsumerModule1/someEventConsumerModule1.module'
    import { SomeEventConsumerModule2 } from './someEventConsumerModule2/someEventConsumerModule2.module'
    // and so on
    
    @Module({
      imports: [
        RabbitMQModule.forRoot(RabbitMQModule, {
          exchanges: [
            name: 'amq.direct',
            type: 'direct' // check out docs for more information on exchange types
          ],
          uri: 'amqp://guest:guest@localhost:5672', // default login and password is guest, and listens locally to 5672 port in amqp protocol
          connectionInitOptions: { wait: false }
        }),
        SomeEventConsumerModule1,
        SomeEventConsumerModule2,
        // ... and other dependent consumer modules 
      ]
    })
    export class EventsModule {}
    

    以下是一个消费者模块的示例(someEventConsumerModule1.module.ts)

    // imports
    import { SomeEventConsumerModule1Service } from './someEventConsumerModule1.service'
    // ...
    
    @Module({
      imports: [
        SomeEventConsumerModule1,
        // other modules if injected
      ],
      providers: [
        SomeEventConsumerModule1Service
      ]
    })
    export class SomeEventConsumerModule1 {}
    

    并在你的服务文件中放置你的业务逻辑如何处理消息

    // imports
    import { RabbitSubscribe } from '@golevelup/nestjs-rabbitmq'
    import { Injectable } from '@nestjs/common'
    import { ConsumeMessage, Channel } from 'amqplib' // for type safety you will need to install package first
    // ... so on
    
    @Injectable()
    export class SomeEventConsumerModule1Service {
      constructor(
        // other module services if needs to be injected
      ) {}
      @RabbitSubscribe({
        exchange: 'amq.direct',
        routingKey: 'direct-route-key', // up to you
        queue: 'queueNameToBeConsumed',
        errorHandler: (channel: Channel, msg: ConsumeMessage, error: Error) => {
          console.log(error)
          channel.reject(msg, false) // use error handler, or otherwise app will crush in not intended way
        }
      })
      public async onQueueConsumption(msg: {}, amqpMsg: ConsumeMessage) {
        const eventData = JSON.parse(amqpMsg.content.toString())
        // do something with eventData
        console.log(`EventData: ${eventData}, successfully consumed!`)
      }
      // ... and in the same way
    }
    

    【讨论】: