【发布时间】:2020-05-28 07:21:28
【问题描述】:
我有一个持久的actor,它可以接收一种类型的命令Persist(event),其中事件类型是trait Event(它有很多实现)。如果成功,这会以Persisted(event) 回复发件人。
事件本身是可序列化的,因为这是我们存储在持久性存储中的数据,并且序列化是通过自定义序列化程序实现的,该序列化程序在内部使用从 google protobuf .proto 文件生成的类。这个自定义序列化器在application.conf 中配置并绑定到基本特征Event。这已经很好了。
注意:Event 的实现不是 protobuf 生成的类。它们是普通的 scala 类,它们也有一个等价的 protobuf,但它是通过绑定到基本 Event 类型的自定义序列化程序映射的。这是我的前辈为版本控制完成的(这可能不是必需的,因为这也可以使用普通的 protobuf 类 + 自定义序列化组合来处理,但这是另一回事),我不想更改那个 atm。
我们现在正在尝试为这个参与者实现集群分片,这也意味着我的命令(即Persist 和Persisted)也需要可序列化,因为它们可能会被转发到其他节点。
这是领域模型:
sealed trait PersistenceCommand {
def event: Event
}
final case class Persisted(event: Event) extends PersistenceCommand
final case class Persist(event: Event) extends PersistenceCommand
问题是,我没有看到使它可序列化的优雅方法。以下是我考虑过的选项
方法 1. 为 Persist 和 Persisted 定义一个新的 proto 文件,但我应该使用什么作为 event 的数据类型?我没有找到一种方法来定义这样的东西:
message Persist {
"com.example.Event" event = 1 // this doesn't work
}
这样我就可以使用现有的 Scala trait Event 作为数据类型。如果这可行,我想(虽然它很牵强)我可以将生成的代码(在编译这个 proto 文件之后)绑定到 akka 的谷歌 protobuf 的内置序列化程序,它可能会工作。上面的注释解释了为什么我不能在我的 proto 文件中使用 oneof 构造。
方法 2。这是我已经实现的方法并且它有效(但我不喜欢它)
基本上,我为命令编写了一个新的序列化程序,并将event 部分命令的序列化和反序列化委托给现有的序列化程序。
class PersistenceCommandSerializer extends SerializerWithStringManifest {
val eventSerializer: ManifestAwareEventSerializer = new ManifestAwareEventSerializer()
val PersistManifest = Persist.getClass.getName
val PersistedManifest = Persisted.getClass.getName
val Separator = "~"
override def identifier: Int = 808653986
override def manifest(o: AnyRef): String = o match {
case Persist(event) => s"$PersistManifest$Separator${eventSerializer.manifest(event)}"
case Persisted(event) => s"$PersistedManifest$Separator${eventSerializer.manifest(event)}"
}
override def toBinary(o: AnyRef): Array[Byte] = o match {
case command: PersistenceCommand => eventSerializer.toBinary(command.event)
}
override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = {
val (commandManifest, dataManifest) = splitIntoCommandAndDataManifests(manifest)
val event = eventSerializer.fromBinary(bytes, dataManifest).asInstanceOf[Event]
commandManifest match {
case PersistManifest =>
Persist(event)
case PersistedManifest =>
Persisted(event)
}
}
private def splitIntoCommandAndDataManifests(manifest: String) = {
val commandAndDataManifests = manifest.split(Separator)
(commandAndDataManifests(0), commandAndDataManifests(1))
}
}
这种方法的问题是我在def manifest 和def fromBinary 中所做的事情。在序列化和反序列化时,我必须确保我拥有命令的清单以及事件的清单。因此,我不得不使用~ 作为分隔符——类似于我的清单信息的自定义序列化技术。
是否有更好的或正确的方法来实现这一点?
对于上下文:我正在使用 ScalaPB 从 .proto 文件和经典的 akka 演员生成 scala 类。
非常感谢任何形式的指导!
【问题讨论】:
-
关于
Approach 1,你试过protobuf的oneof,例如protobuf message Persists { oneof event { EventTypeX = 1; EventTypeY = 2; ... } } -
嗨@SavaVranešević 我不能这样做,因为事件类型的实现不是protobuf生成的类。它们是普通的 scala 类,它们也有一个等价的 protobuf,但它是通过绑定到基本
Event类型的自定义序列化程序映射的。这是我的前辈为版本控制完成的(这可能不是必需的,因为这也可以使用普通的 protobuf 类 + 自定义序列化组合来处理,但那是另一回事),我不想改变那个 atm。
标签: scala akka akka-persistence protocol-buffers scalapb