目录

  1. 关于文章
  2. 简短的自我介绍
  3. 对反应性声明的回顾
  4. 实施与解释
  5. 概括

    关于文章

    • 在本文中,在解释了响应式声明和 CQRS 的概要之后,我们将以 Scala/Akka(类型化)中的实现为例进行说明。
    • 目标读者是 Scala 程序员和无法用 Akka 掌握全局的人。 (我们正在尽最大努力让那些没有被覆盖的人可以理解。)
    • 请注意,本文不讨论 DDD。关于 DDD 和 CQRS 模式之间的关系,我很抱歉,但请参阅另一篇文章。
    • 调查内容我们会处理,但如果说明有不足之处,请评论本文,或@AerosmithBz请联系
    • 对反应性声明的回顾是对响应式声明的总结,看过的可以跳过。

    关于 Akka 许可证变更
    Akka 的许可证从版本 2.7.x 更改为 BSL1.1。
    请在将其用于商业目的之前进行自己的研究。

    简短的自我介绍

    很高兴见到你。
    我的名字是 Asada (HN),我是一名在内部和 SES 开发 Web 系统的一年级工程师。
    目前,我在项目现场主要使用 Python 和 PHP,没有机会接触 Scala,所以我每天都写歌来表达我的悲伤。
    我已经使用 Scala 大约一年半了,并且已经开发(学习)Akka 大约一年了。
    关于 GitHub这里.

    对反应性声明的回顾

    在学习CQRS的时候,我觉得还是理解反应式声明的底层概念和思想比较好,所以这次我将反应式声明分解,做一个简单的解释。

    用户对系统的期望

    反应性声明文本包含以下前言:

    就在几年前,巨大的应用程序数十台服务器由组成秒级响应时间和数小时的离线维护数据是几千兆字节曾是。今天的应用范围从移动设备到由数千个多核处理器驱动的基于云的集群。在任何设备上部署在用户是毫秒级响应时间和 100% 正常运行时间预计。数据是以 PB 为单位测量单位昨天的软件架构根本无法满足今天的需求。

    反应式声明https://www.reactivemanifesto.org/ja

    简而言之,系统臃肿,用户更挑剔,数据量以 PB 为单位处理。
    今天的系统跟不上变化,无法按照用户的期望行事。
    现在我们如何解决这个问题?这就是故事。

      服务器数量 预期响应时间 数据量
    很久以前 小的 晚的 几千兆字节
    现在 许多 早期的 以 PB 为单位

    构成反应系统的四个要素

    您想要的是一个响应迅速、容错、弹性和消息驱动的系统。我们称之为反应系统。

    反应式声明https://www.reactivemanifesto.org/ja*

    文中将称为反应式系统的架构声明为解决上述问题的系统,并作为组成元素响应式、容错、弹性、消息驱动已列出。
    分别,

    1. 响应能力
      系统将尽快做出响应。
    2. 容错
      系统在遇到故障时仍然保持响应。
    3. 弹性
      系统在工作负载波动时保持响应。
    4. 消息驱动
      反应式系统依靠异步消息传递来建立组件之间的边界。

      进行了解释,总而言之,通过使用异步消息传递构建健壮、可扩展且响应迅速的系统来解决问题这就是反应系统的概要。

      Akkaを使用したCQRSパターンの実装
      报价来源:https://www.reactivemanifesto.org/ja

      为什么将消息驱动作为一种手段

      文中陈述如下。

      消息驱动:反应式系统依靠异步消息传递来建立组件之间的边界。这允许松散耦合、隔离、位置透明度以及将错误委托为消息的方法。显式消息传递允许负载管理和弹性。它还通过在系统中创建消息队列、监视它们并在必要时应用背压来启用流控制。通过使用位置透明的消息传递作为通信手段,无论通信是跨集群还是在单个主机内,都可以使用相同的配置和语义来管理故障。非阻塞通信允许接收器仅在活动时消耗资源,从而减少系统开销。

      反应式声明https://www.reactivemanifesto.org/ja

      * 位置透明性是一个术语,指的是运行时实例及其引用的分离。

      换句话说,通过使用消息驱动的异步消息传递,我们的目标是满足以下四个要点。

      1. 确保松散耦合。
      2. 显式消息传递使每个组件更易于管理。
      3. 位置透明可实现弹性资源管理。
      4. 使用位置透明的消息传递可以在配置中创建通用项目并降低成本。

        我认为仅仅通过解释很难得到这个区域的图像,包括位置透明度,所以我强烈建议实施它并把握全局。

        实施与解释

        上面的文字量已经变成了我想象的5倍左右,所以着急。

        1. 关于CQRS
        2. 关于阿卡
        3. 示例项目概述
        4. 来源和评论

          关于 CQRS

          CQRS(Command Query Responsibility Segregation)字面意思是
          命令(写)和查询为了分离(读取)和保证可扩展性,数据在写入时以事件的形式存储在DB中事件溯源模式(EventSourcing)并且经常一起实施。

          作为本文的具体流程,

          1. 命令以事件的形式将用户的写入写入NoSQL进行写入(下面是Cassandra)。
          2. 投影反映 RDBMS 中的数据,以根据事件读取(以下 Aurora)。
          3. 读取 API 获取数据。
          4. 来源和评论

            变成。
            Akkaを使用したCQRSパターンの実装

            关于阿卡

            在这里,我将解释示例项目的实现,以及使用 Akka 时应该了解的 ClusterScharding。

            • 集群分片
              ClusterSharding 本身就是集群的一种思路,在 Akka 中是 Akka Cluster 提供的一个功能。每个节点对应每个Actor碎片被创建,Actor 由 Shard 管理,消息通过 Shard 发送和接收。
              由于可以通过指定的策略在每个 Shard 之间调整每个节点的 Actor 数量,因此可以重现响应式声明中定义的位置透明性,并且在组成集群的节点中不会发生错误。演员的再平衡(重新排列)进行。
              此外,本次说明的示例项目不处理复制,但提供了 Akka Persistence 作为节点断开的对策。复制建议实施。
              Akkaを使用したCQRSパターンの実装

            示例项目概述

            为了掌握CQRS的全貌,我们将在应用端实现一个简单的计数器应用。
            也在这里写API,投影解释读取API是否可以向数据库抛出查询没有特别的规范,因此我将省略说明。

            写API图

            Akkaを使用したCQRSパターンの実装

            写API流程图

            Akkaを使用したCQRSパターンの実装

            投影图

            Akkaを使用したCQRSパターンの実装

            来源和评论

            源的全文是GitHub请参阅。

            领域

            case class Counter(number: Int = 0) {
            
              // カウントアップした値を持つカウンターを返却する。
              def countUp(number: Int): Counter =
                this.copy(number = this.number + number)
            }
            
            • 域将返回一个副本,因为我们希望以后能够将状态保存并更新为参与者中的状态。

            聚合协议

            // Serialize用に定義
            trait CborSerializable
            
            object CounterAggregateProtocol {
            
              sealed trait CounterCommand extends CborSerializable
            
              // ActorRef[_]::ask用のリプライ先のActorRefをコマンドに格納する。
              final case class CountUp(id: String, n: Int)(val replyTo: ActorRef[CountUpReply]) extends CounterCommand
            
              sealed trait CountUpReply extends CborSerializable
            
              final case class CountUpSucceededReply(counter: Counter) extends CountUpReply
            
              final case class CountUpFailedReply(e: Exception) extends CountUpReply
            }
            
            • 定义聚合发送和接收的消息。
            • CborSerializable关于官方文件请参阅

            总计的

            object CounterAggregate {
            
              // ①
              private def commandHandler(ctx: ActorContext[CounterAggregateProtocol.CounterCommand]):
              (Counter, CounterAggregateProtocol.CounterCommand) => Effect[CounterEvent, Counter] =
                (_, command) =>
                  command match {
                    case cmd@CounterAggregateProtocol.CountUp(id, n) =>
                      Effect.persist(CounterEvent.CountUpped(id, n)).thenReply(cmd.replyTo) { state =>
                        CounterAggregateProtocol.CountUpSucceededReply(state)
                      }
                  }
            
              // ②
              private def eventHandler: (Counter, CounterEvent) => Counter =
                (state, event) =>
                  event match {
                    case CounterEvent.CountUpped(_, n) => state.countUp(n)
                  }
            
              def apply(persistenceId: PersistenceId): Behavior[CounterAggregateProtocol.CounterCommand] =
                Behaviors.setup { ctx =>
                  EventSourcedBehavior(
                    // ③
                    persistenceId = persistenceId,
                    // ④
                    emptyState = Counter(),
                    // コマンドハンドラとイベントハンドラを定義する。
                    commandHandler = this.commandHandler(ctx),
                    eventHandler = this.eventHandler)
                    // ⑤
                    .withTagger(_ => Set(CounterTags.Single))
                    // ⑥
                    .withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 10, keepNSnapshots = 3))
                    // ⑦
                    .onPersistFailure(SupervisorStrategy.restartWithBackoff(200.millis, 5.seconds, 0.1))
                }
            }
            
            • (1) 定义接收命令时的处理。使用 Effect::persist 持久化事件。
            • ②定义接收事件时的流程。
            • ③ persistenceId... 持久性的主键。引用时,根据这个值获取actor。
            • ④ 在 actor 开始时状态值。定义一个空状态。
            • ⑤标签的定义。不要在生产中仅使用单个标签来实现。关于标签这里。
            • ⑥ 定义创建快照的频率。
            • 主管策略...定义故障行为。

            投影处理器

            * 投影可以在WriteApi中处理,但是这次是在一个单独的项目中剪下来的。

            final class CounterProjectionHandler(system: ActorSystem[_])
              extends Handler[EventEnvelope[CounterEvent]] {
            
              private implicit val ec: ExecutionContext = system.executionContext
            
              override def process(envelope: EventEnvelope[CounterEvent]): Future[Done] = {
                envelope.event match {
                  case CounterEvent.CountUpped(id, number) =>
                    val row = CounterRow(id, number)
                    // 読込用DBにイベントを反映させている。
                    CounterRepository.insert(row).map(_ => Done)
                }
              }
            }
            
            
            • 定义事件接收通知的过程。
            • 主要将数据存储在读取的DB中,并将数据传递给Kafka等。

            投影

            * 投影可以在WriteApi中处理,但这次是在单独的项目中剪掉。

            object CounterProjection extends App {
            
              def apply(): Behavior[String] = Behaviors.setup { context =>
            
                implicit val system: ActorSystem[_] = context.system
                implicit val ec: ExecutionContextExecutor = system.executionContext
            
                val sourceProvider =
                  // ①
                  EventSourcedProvider
                    .eventsByTag[CounterEvent](
                      system,
                      readJournalPluginId = CassandraReadJournal.Identifier,
                      tag = CounterTags.Single)
            
                val projection =
                  // ②
                  CassandraProjection
                    .atLeastOnce(
                      projectionId = ProjectionId("counters", CounterTags.Single),
                      sourceProvider,
                      handler = () => new CounterProjectionHandler(system))
                    .withSaveOffset(afterEnvelopes = 1, afterDuration = 500.millis)
            
                // ③
                ClusterSingleton(system).init(
                  SingletonActor(
                    ProjectionBehavior(projection),
                    projection.projectionId.id)
                    .withStopMessage(ProjectionBehavior.Stop))
            
                Behaviors.empty
              }
            
              ActorSystem(CounterProjection(), "akkaCqrsCounter_projection")
              StdIn.readLine()
            }
            
            • ①根据标签和ReadJounalPlugin获取Source。
            • ② 设置投影。
            • ③ 这次我们采用了ClusterSingleton。

            概括

            最后写的有点仓促,不过我想我会不时更新文章的。 (这花了大约6个小时,所以请原谅我。)
            到目前为止我所学到的印象是创建这篇文章的背景,但正如预期的那样,关于Akka这个领域的文章很少,我作为一个历史不长的工程师很难学习。
            非常感谢所有给我建议的人。
            我希望这篇文章能引导你回到原来的自己。
            非常感谢。

            Github 推特


原创声明:本文系作者授权爱码网发表,未经许可,不得转载;

原文地址:https://www.likecs.com/show-308626753.html

相关文章:

  • 2022-12-23
  • 2022-12-23
  • 2022-12-23
  • 2022-12-23
  • 2021-07-11
  • 2021-11-11
  • 2022-12-23
  • 2021-04-15
猜你喜欢
  • 2022-12-23
  • 2022-12-23
  • 2021-09-27
  • 2021-11-23
  • 2021-05-28
  • 2022-12-23
  • 2022-01-28
相关资源
相似解决方案