【发布时间】:2020-09-12 19:41:42
【问题描述】:
我正在尝试使用 NestJs 使用 Kafka 作为事件存储的 Event-Sourcing 和 CQRS。
该应用程序是一个小而简单的应用程序,包含 2 个部分,即客户和订单。您首先创建一个具有一些初始余额的客户,然后使用您创建订单的客户 ID,如果订单金额小于余额,则批准否则拒绝。
这里是有问题的代码:https://github.com/Ashniu123/nestjs-customer-order-eventsourcing-cqrs
我使用 KafkaJs 作为 EventBus(在 libs/ 下创建了我自己的 KafkaModule)
当我使用 Kafka 和 MongoDB 运行它时,应用程序启动得很好。
当我也创建一个客户时,事件CreateCustomerEvent 会按预期发布,并由 CommandHandler 推送到 Kafka。 (使用landoop UI检查)
当从 Kafka 读取 Event 并将其推送到 EventBus 以由 EventHandler 拾取和执行时,问题就出现了。喜欢CreateCustomerEventHandler。
我为 EventBus 使用 Kafka 的配置在每个服务的 AppModule 中。例如,Customer。
并且为KafkaService中的事件配置了EventBus observable subject$。
这里是应用程序日志(为我的 cmets 添加 //)。
customer-svc(命令端)
[Nest] 657306 - 09/13/2020, 12:54:47 AM [CreateCustomerCommandHandler] Running command handler with: {"customerId":"900ee3e9-33aa-431c-bbd0-eea91cafb673","createCustomerDto":{"email":"someemail@gmail.com","password":"abc123","firstName":"john","lastName":"doe","balance":1000}}
[Nest] 657306 - 09/13/2020, 12:54:47 AM [KafkaService] Published event: {"customerId":"900ee3e9-33aa-431c-bbd0-eea91cafb673","createCustomerDto":{"email":"someemail@gmail.com","password":"abc123","firstName":"john","lastName":"doe","balance":1000},"eventType":"CreateCustomerEvent"}
customer-view-svc(查询/查看端)
[Nest] 657550 - 09/13/2020, 12:54:47 AM [KafkaService] Bridged event payload value: {"customerId":"900ee3e9-33aa-431c-bbd0-eea91cafb673","createCustomerDto":{"email":"someemail@gmail.com","password":"abc123","firstName":"john","lastName":"doe","balance":1000},"eventType":"CreateCustomerEvent"}
[Nest] 657550 - 09/13/2020, 12:54:47 AM [CreateCustomerEventHandler] Running event handler with: {"customerId":"900ee3e9-33aa-431c-bbd0-eea91cafb673","createCustomerDto":{"email":"someemail@gmail.com","password":"abc123","firstName":"john","lastName":"doe","balance":1000}}
[Nest] 657550 - 09/13/2020, 12:54:47 AM [CreateCustomerEventHandler] Running event handler with: {"customerId":"900ee3e9-33aa-431c-bbd0-eea91cafb673","createCustomerDto":{"email":"someemail@gmail.com","password":"abc123","firstName":"john","lastName":"doe","balance":1000}}
[Nest] 657550 - 09/13/2020, 12:54:47 AM [CreateCustomerEventHandler] Created customer: {"_id":"5f5d207f57cd5f089895867a","id":"900ee3e9-33aa-431c-bbd0-eea91cafb673","email":"someemail@gmail.com","password":"$2b$10$NzEnAHRsfh/7QnczB3p/MepPl0fD44G/6sFtzKsjpwudjYlNjGacG","firstName":"john","lastName":"doe","balance":1000,"salt":"$2b$10$NzEnAHRsfh/7QnczB3p/Me"}
[Nest] 657550 - 09/13/2020, 12:54:48 AM [CreateCustomerEventHandler] Created customer: {"_id":"5f5d207f57cd5f089895867b","id":"900ee3e9-33aa-431c-bbd0-eea91cafb673","email":"someemail@gmail.com","password":"$2b$10$w0.mShhI3cMys7XAPLHRFusy63Fqlzj9s95JuSGdDpy.g5n5nt/8O","firstName":"john","lastName":"doe","balance":1000,"salt":"$2b$10$w0.mShhI3cMys7XAPLHRFu"}
// for some reason the another customer of same email is created even though in `customer.schema.ts` I have specified that it should be unique (not a priority at the moment)
我可以从日志中推断出,Kafka 事件按预期仅由消费者接收一次,但使用 subject$.next 移动到 EventHandler 两次。
另外,澄清一下,根据创建时 customer._id 的不同值所建议的,事件被两次推送到 EventHandler。
使用调试器我可以看到subject.observers 在类FilterSubscriber 的数组中有2 个值。我不知道这是否有用,但只是想安排我自己解决这个问题的努力,经过 6 个小时的无所事事,我来这里寻求帮助:)。
如果你们更好地使用它,我已经在 repo 中添加了 launch.json 以与 VSCode 一起使用。只需使用正在运行的应用程序的 processId 附加即可。
附:我以类似的方式配置了customer-view-svc 和order-view-svc 的EventBus,并且两者都存在问题(即重复事件)。我希望你们能够帮助我解决这个问题。
谢谢。
【问题讨论】:
-
我有同样的问题,在本地事件总线上发布的事件的事件处理程序被调用了两次。这个问题是在nestjs-cqrs的github repo上发布的吗?
-
@YousufKhan 我使用了外部总线——卡夫卡。既然你也面临这个问题,我在他们的 github repo 上提出了一个问题。
标签: node.js apache-kafka nestjs cqrs kafkajs