【问题标题】:Axon Framework: Handle only events published by the same JVM instance?Axon 框架:仅处理由同一 JVM 实例发布的事件?
【发布时间】:2020-08-08 08:57:19
【问题描述】:

嗨 Axon 框架社区,

我想听听您对如何正确解决以下问题的意见。

我的 Axon 测试设置

  • 同一个 Spring Boot 应用程序的两个实例(使用 axon-spring-boot-starter 4.4 而没有 Axon Server)
  • 每个实例都会定期发布相同的事件
  • 两个实例都连接到同一个 EventSource(使用 JpaEventStorageEngine 的单个 SQL Server 实例)
  • 每个实例都配置为使用 TrackingEventProcessors
  • 每个实例都注册了相同的事件处理程序

我想要达到的目标

我希望一个实例发布的事件只由同一个实例处理

如果 instance1 发布 eventX,那么只有 instance1 应该处理 eventX

到目前为止我所做的尝试

  • 我可以使用 SubscribingEventProcessor 实现上述场景。不幸的是,这不是我的选择,因为我们希望可以选择重播事件以重建/添加新的查询模型。
  • 我可以将每个实例的事件处理程序分配给不同的处理组。不幸的是,这没有奏效。也许是因为每个 TrackingEventProcessors 实例都处理相同的 EventStream ? - 虽然不太确定。
  • 我可以实现一个 MessageHandlerInterceptor,它只在事件源来自同一个实例的情况下才会继续。这是我到目前为止实施的并且可以正常工作的内容: MessageHandlerInterceptor
class StackEventInterceptor(private val stackProperties: StackProperties) : MessageHandlerInterceptor<EventMessage<*>> {

    override fun handle(unitOfWork: UnitOfWork<out EventMessage<*>>?, interceptorChain: InterceptorChain?): Any? {
        val stackId = (unitOfWork?.message?.payload as SomeEvent).stackId
        if(stackId == stackProperties.id){
            interceptorChain?.proceed()
        }
        return null
    }
}
@Configuration
class AxonConfiguration {

    @Autowired
    fun configure(eventProcessingConfigurer: EventProcessingConfigurer, stackProperties: StackProperties) {
        val processingGroup = "processing-group-stack-${stackProperties.id}"
        eventProcessingConfigurer.byDefaultAssignTo(processingGroup)
        eventProcessingConfigurer.registerHandlerInterceptor(processingGroup) { StackEventInterceptor(stackProperties) }
    }
}

有更好的解决方案吗?

我的印象是我当前的解决方案并不是最好的,因为理想情况下我希望只有属于某个实例的事件处理程序由 TrackingEventProcessor 实例触发。

你会怎么解决这个问题?

【问题讨论】:

    标签: cqrs axon


    【解决方案1】:

    @thowimmer 在这里遇到的有趣场景。 我的第一个预感是“改用SubscribingEventProcessor”。 但是,您指出这不是您的设置中的一个选项。 我认为它非常对于处于相同情况的其他人知道为什么这不是一个选项很有价值。所以,也许你可以详细说明一下(说实话,我也很好奇)。

    现在针对您的问题案例,确保事件仅在同一个 JVM 中处理。 将来源添加到事件中绝对是您可以采取的一个步骤,因为这允许一种合乎逻辑的过滤方式。 “此事件是否源自my.origin()?”如果没有,您只需忽略该事件并完成它,就这么简单。不过,还有另一种方法可以实现这一点,我稍后会谈到。

    但是,我认为过滤的地方主要是您要寻找的地方。但首先,我想先说明为什么你需要过滤。正如您所注意到的,TrackingEventProcessor (TEP) 从所谓的StreamableMessageSource 流式传输事件。 EventStore 是这种StreamableMessageSource 的实现。当您将所有事件存储在同一个存储中时,它只会将所有内容流式传输到您的 TEP。由于您的事件是单个事件流的一部分,因此您需要在某个阶段过滤它们。使用MessageHandlerInterceptor 会起作用,您甚至可以编写HandlerEnhacnerDefinition 允许您向事件处理函数添加额外的行为。但是,尽管您这么说,但在当前设置下,需要在某处进行过滤。 MessageHandlerInterceptor 可以说是最简单的地方。

    但是,有一种不同的处理方式。为什么不将您的 Event Store 分离为两个应用程序的不同实例?显然他们没有需要相互读取,那么为什么要共享同一个事件存储呢?在不了解您的领域的进一步背景的情况下,我猜您实际上是在处理位于不同 bounded contexts 中的应用程序。 非常简而言之,与应用程序/上下文共享所有内容的兴趣为零,您只需非常有意识地彼此共享您的领域语言的特定部分。

    请注意,支持multiple contexts,在中间使用单个通信集线器,正是 Axon Server 可以为您实现的。我不是在这里说你不能自己配置这个,我过去做过。但是,将这项工作留给其他人或其他人,让您无需配置基础架构,这将节省大量时间。

    希望这可以帮助您将我对此事@thowimmer 的一些想法设置为上下文。

    【讨论】:

    • 嗨@Steven,首先非常感谢您的大力支持!在支持社区、建立伟大的技术和相互尊重方面,Axon 是一个优秀的团队 - 赞!我的问题的更多背景知识:我们在主动/被动设置中使用蓝/绿部署。这意味着在部署期间,由工作节点发起的事件可能由可能损坏的节点处理。两个节点中的每一个都有一个专用的镜像数据库实例,因此我们可以零停机时间修补数据库实例。
    • 对您的建议: 1) 订阅与跟踪:我们希望稍后重播事件以添加/重建查询模型。 (编辑了我的问题) 2)隔离 EventStore:我实际上已经考虑过这一点,但想先评估其他选项 - 尽管如此,这可能是您所概述的方式。 3) 不同的限界上下文:实际上它是同一个应用程序,只是用于操作目的的不同实例。 4) Axon Server:中期我们可能会切换到 Axon Server。我将进一步评估它的选择:-)
    • 首先,感谢您的夸奖!我们努力为任何关于 Axon 产品的问题提供全面而清晰的解释,因此听到这样的体验总是令人愉快的。 :-)
    • 关于你的场景,明白了!如果您谈论的是蓝绿部署策略,那么要求当然会略有不同。您试图通过将事件与蓝色和绿色应用程序隔离来缓解的确切问题是什么?
    • 是的,这是另一个选项 -> 一旦一切准备就绪并处于健康状态,就注册 evenhandlers。这当然是可能的,但在部署期间需要一些额外的步骤,我想避免这些步骤以降低部署复杂性。我将尝试应用程序级别的方法:github.com/thowimmer/axon-eventhandling-same-instance - 它易于理解并解决了我们的问题。不过,我还不确定 PID 方法是否是最好的方法,因为当 PID 发生变化时(例如,在重生/切换实例期间)我们可能会丢失事件......我会让你了解最新的 ;-)
    【解决方案2】:

    总结:

    如果我们想使用 TrackingEventProcessor 的功能,对两个实例使用相同的 EventStore 可能不是一个理想的设置。

    解决方案:

    • 每个应用程序实例的专用(非镜像)数据库实例。
    • 使用multiple contexts 使用 AxonServer。

    如果我们决定使用MessageHandlerInterceptor 解决应用程序级过滤问题,是最简单的解决方案。

    感谢@Steven 交换意见。


    编辑:

    使用CorrelationDataProviderMessageHandlerInterceptor 的应用程序级别解决方案,过滤掉并非源自同一进程的事件。

    AxonConfiguration.kt

    const val METADATA_KEY_PROCESS_ID = "pid"
    const val PROCESSING_GROUP_PREFIX = "processing-group-pid"
    
    @Configuration
    class AxonConfiguration {
    
        @Bean
        fun processIdCorrelationDataProvider() = ProcessIdCorrelationDataProvider()
    
        @Autowired
        fun configureProcessIdEventHandlerInterceptor(eventProcessingConfigurer: EventProcessingConfigurer) {
            val processingGroup = "$PROCESSING_GROUP_PREFIX-${ApplicationPid()}"
            eventProcessingConfigurer.byDefaultAssignTo(processingGroup)
            eventProcessingConfigurer.registerHandlerInterceptor(processingGroup) { ProcessIdEventHandlerInterceptor() }
        }
    }
    
    class ProcessIdCorrelationDataProvider() : CorrelationDataProvider {
        override fun correlationDataFor(message: Message<*>?): MutableMap<String, *> {
            return mutableMapOf(METADATA_KEY_PROCESS_ID to ApplicationPid().toString())
        }
    }
    
    class ProcessIdEventHandlerInterceptor : MessageHandlerInterceptor<EventMessage<*>> {
        override fun handle(unitOfWork: UnitOfWork<out EventMessage<*>>?, interceptorChain: InterceptorChain?) {
            val currentPid = ApplicationPid().toString()
            val originPid = unitOfWork?.message?.metaData?.get(METADATA_KEY_PROCESS_ID)
            if(currentPid == originPid){
                interceptorChain?.proceed()
            }
        }
    }
    

    GitHub上查看完整的演示项目

    【讨论】:

      猜你喜欢
      • 2021-12-21
      • 1970-01-01
      • 2014-11-05
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-12-19
      • 1970-01-01
      相关资源
      最近更新 更多