【发布时间】:2020-08-08 08:57:19
【问题描述】:
嗨 Axon 框架社区,
我想听听您对如何正确解决以下问题的意见。
我的 Axon 测试设置
- 同一个 Spring Boot 应用程序的两个实例(使用 axon-spring-boot-starter 4.4 而没有 Axon Server)
- 每个实例都会定期发布相同的事件
- 两个实例都连接到同一个 EventSource(使用 JpaEventStorageEngine 的单个 SQL Server 实例)
- 每个实例都配置为使用 TrackingEventProcessors
- 每个实例都注册了相同的事件处理程序
我想要达到的目标
我希望一个实例发布的事件只由同一个实例处理
如果 instance1 发布 eventX,那么只有 instance1 应该处理 eventX
到目前为止我所做的尝试
- 我可以使用 SubscribingEventProcessor 实现上述场景。不幸的是,这不是我的选择,因为我们希望可以选择重播事件以重建/添加新的查询模型。
- 我可以将每个实例的事件处理程序分配给不同的处理组。不幸的是,这没有奏效。也许是因为每个 TrackingEventProcessors 实例都处理相同的 EventStream ? - 虽然不太确定。
- 我可以实现一个 MessageHandlerInterceptor,它只在事件源来自同一个实例的情况下才会继续。这是我到目前为止实施的并且可以正常工作的内容: MessageHandlerInterceptor
class StackEventInterceptor(private val stackProperties: StackProperties) : MessageHandlerInterceptor<EventMessage<*>> {
override fun handle(unitOfWork: UnitOfWork<out EventMessage<*>>?, interceptorChain: InterceptorChain?): Any? {
val stackId = (unitOfWork?.message?.payload as SomeEvent).stackId
if(stackId == stackProperties.id){
interceptorChain?.proceed()
}
return null
}
}
@Configuration
class AxonConfiguration {
@Autowired
fun configure(eventProcessingConfigurer: EventProcessingConfigurer, stackProperties: StackProperties) {
val processingGroup = "processing-group-stack-${stackProperties.id}"
eventProcessingConfigurer.byDefaultAssignTo(processingGroup)
eventProcessingConfigurer.registerHandlerInterceptor(processingGroup) { StackEventInterceptor(stackProperties) }
}
}
有更好的解决方案吗?
我的印象是我当前的解决方案并不是最好的,因为理想情况下我希望只有属于某个实例的事件处理程序由 TrackingEventProcessor 实例触发。
你会怎么解决这个问题?
【问题讨论】: