【发布时间】:2021-08-08 22:43:27
【问题描述】:
我有 2 种从 json/csv 导入产品和库存的方法。我已经实现了 NestJS Bull Module 用于排队作业。两个导入过程都在异步运行并且工作正常。但是现在我想先完全处理产品导入队列,然后只处理库存导入队列。
jobs.module.ts
BullModule.registerQueueAsync({
name: 'default',
inject: [ConfigService],
useFactory: async (
configService: ConfigService,
): Promise<BullModuleOptions> => ({
redis: {
...
},
defaultJobOptions: {
attempts: 1,
},
}),
})
jobs.service.ts
constructor(@InjectQueue('default') private defaultQueue: Queue) { }
@Cron(CronExpression.EVERY_30_MINUTES)
async queueProductUpdateJob() {
await this.defaultQueue.add('productUpdate');
}
@Cron(CronExpression.EVERY_30_MINUTES)
async queueInventoryUpdateJob() {
await this.defaultQueue.add('inventoryUpdate');
}
jobs.processor.ts
@Process('productUpdate')
async productUpdate(job: Job<unknown>){
console.log('data: ', JSON.stringify(job.data));
await this.updateProductService.importProducts(job.data);
return {};
}
@Process('inventoryUpdate')
async inventoryUpdate(job: Job<unknown>){
console.log('data: ', JSON.stringify(job.data));
await this.updateInventoryService.importInventory(job.data);
return {};
}
我们如何才能先完全处理queueProductUpdateJob 作业,然后再处理queueInventoryUpdateJob?
【问题讨论】:
-
你为什么不在
queueInventoryUpdateJob的末尾打电话给productUpdate? -
您的意思是在 queueProductUpdateJob 末尾的 inventoryUpdate。实际上我需要先处理productUpdate,然后再处理inventoryUpdate。我试过了,但是在产品导入时同时运行的第二个进程作业可能有 100 多行数据。
-
所以可能需要 30 多分钟?这就是为什么另一个进程运行的原因?
-
是的,如果数据行更多,我可能会接受。但我正在寻求帮助的是,每当
product作业运行时,它应该运行inventory作业以更新这些产品库存。