【问题标题】:Serializing message with protobuf for akka actor which contains serializable data使用包含可序列化数据的 akka actor 的 protobuf 序列化消息
【发布时间】:2020-05-28 07:21:28
【问题描述】:

我有一个持久的actor,它可以接收一种类型的命令Persist(event),其中事件类型是trait Event(它有很多实现)。如果成功,这会以Persisted(event) 回复发件人。

事件本身是可序列化的,因为这是我们存储在持久性存储中的数据,并且序列化是通过自定义序列化程序实现的,该序列化程序在内部使用从 google protobuf .proto 文件生成的类。这个自定义序列化器在application.conf 中配置并绑定到基本特征Event。这已经很好了。

注意:Event 的实现不是 protobuf 生成的类。它们是普通的 scala 类,它们也有一个等价的 protobuf,但它是通过绑定到基本 Event 类型的自定义序列化程序映射的。这是我的前辈为版本控制完成的(这可能不是必需的,因为这也可以使用普通的 protobuf 类 + 自定义序列化组合来处理,但这是另一回事),我不想更改那个 atm。

我们现在正在尝试为这个参与者实现集群分片,这也意味着我的命令(即PersistPersisted)也需要可序列化,因为它们可能会被转发到其他节点。

这是领域模型:

sealed trait PersistenceCommand {
  def event: Event
}

final case class Persisted(event: Event) extends PersistenceCommand
final case class Persist(event: Event) extends PersistenceCommand

问题是,我没有看到使它可序列化的优雅方法。以下是我考虑过的选项

方法 1.PersistPersisted 定义一个新的 proto 文件,但我应该使用什么作为 event 的数据类型?我没有找到一种方法来定义这样的东西:

  message Persist {
   "com.example.Event" event = 1 // this doesn't work
   }

这样我就可以使用现有的 Scala trait Event 作为数据类型。如果这可行,我想(虽然它很牵强)我可以将生成的代码(在编译这个 proto 文件之后)绑定到 akka 的谷歌 protobuf 的内置序列化程序,它可能会工作。上面的注释解释了为什么我不能在我的 proto 文件中使用 oneof 构造。

方法 2。这是我已经实现的方法并且它有效(但我不喜欢它)

基本上,我为命令编写了一个新的序列化程序,并将event 部分命令的序列化和反序列化委托给现有的序列化程序。

class PersistenceCommandSerializer extends SerializerWithStringManifest {
  val eventSerializer: ManifestAwareEventSerializer = new ManifestAwareEventSerializer()

  val PersistManifest   = Persist.getClass.getName
  val PersistedManifest = Persisted.getClass.getName
  val Separator         = "~"

  override def identifier: Int = 808653986

  override def manifest(o: AnyRef): String = o match {
    case Persist(event)   => s"$PersistManifest$Separator${eventSerializer.manifest(event)}"
    case Persisted(event) => s"$PersistedManifest$Separator${eventSerializer.manifest(event)}"
  }

  override def toBinary(o: AnyRef): Array[Byte] = o match {
    case command: PersistenceCommand => eventSerializer.toBinary(command.event)
  }

  override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = {
    val (commandManifest, dataManifest) = splitIntoCommandAndDataManifests(manifest)
    val event                           = eventSerializer.fromBinary(bytes, dataManifest).asInstanceOf[Event]
    commandManifest match {
      case PersistManifest =>
        Persist(event)
      case PersistedManifest =>
        Persisted(event)
    }
  }

  private def splitIntoCommandAndDataManifests(manifest: String) = {
    val commandAndDataManifests = manifest.split(Separator)
    (commandAndDataManifests(0), commandAndDataManifests(1))
  }
}

这种方法的问题是我在def manifestdef fromBinary 中所做的事情。在序列化和反序列化时,我必须确保我拥有命令的清单以及事件的清单。因此,我不得不使用~ 作为分隔符——类似于我的清单信息的自定义序列化技术。

是否有更好的或正确的方法来实现这一点?

对于上下文:我正在使用 ScalaPB 从 .proto 文件和经典的 akka 演员生成 scala 类。

非常感谢任何形式的指导!

【问题讨论】:

  • 关于Approach 1,你试过protobuf的oneof,例如protobuf message Persists { oneof event { EventTypeX = 1; EventTypeY = 2; ... } }
  • 嗨@SavaVranešević 我不能这样做,因为事件类型的实现不是protobuf生成的类。它们是普通的 scala 类,它们也有一个等价的 protobuf,但它是通过绑定到基本 Event 类型的自定义序列化程序映射的。这是我的前辈为版本控制完成的(这可能不是必需的,因为这也可以使用普通的 protobuf 类 + 自定义序列化组合来处理,但那是另一回事),我不想改变那个 atm。

标签: scala akka akka-persistence protocol-buffers scalapb


【解决方案1】:

如果您将嵌套对象的序列化委托给您已配置的任何序列化程序,则嵌套字段应该有bytes 用于序列化数据,还应有一个int32 带有使用的序列化程序的ID 和bytes 用于消息清单.这可确保您能够版本/替换嵌套的序列化程序,这对于将要存储更长时间的数据很重要。

您可以在此处查看 Akka 内部如何为我们自己的有线格式完成此操作:https://github.com/akka/akka/blob/6bf20f4117a8c27f8bd412228424caafe76a89eb/akka-remote/src/main/protobuf/WireFormats.proto#L48 和此处https://github.com/akka/akka/blob/6bf20f4117a8c27f8bd412228424caafe76a89eb/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala#L45

【讨论】:

    猜你喜欢
    • 2019-08-15
    • 2015-02-13
    • 1970-01-01
    • 2015-04-16
    • 2016-07-19
    • 2011-09-06
    • 2017-02-25
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多