【发布时间】:2016-08-11 05:01:07
【问题描述】:
我正在尝试使用 Play Framework (Scala) 作为 Akka 集群客户端,将消息发送到另一个运行我的应用服务的 Akka 集群。
这是我所做的:
-
我使用 Protocol Buffer 在不同的模块中定义了消息,并在运行服务的项目和 Play 应用程序之间共享(使用 git 子模块)
syntax = "proto2"; option java_package = "com.myproject.api.common.messages"; option java_outer_classname = "IsValidClientMessage"; message IsValidClient { required int32 clientId = 1; required string clientSecret = 2; } -
在 2560 端口启动服务
akka { remote.netty.tcp.port=${?AKKA_REMOTE_PORT} remote.netty.tcp.hostname=127.0.0.1 cluster { seed-nodes = [ "akka.tcp://ApiServiceActorSystem@127.0.0.1:2560" ] auto-down-unreachable-after = 10s } extensions = ["akka.cluster.client.ClusterClientReceptionist"] loglevel = DEBUG actor { serializer { proto = "akka.remote.serialization.ProtobufSerializer" } serialization-bindings { "com.myproject.api.common.messages.IsValidClientMessage$IsValidClient" = proto } serialize-messages = on provider = "akka.cluster.ClusterActorRefProvider" debug { receive = on } } } -
并使用以下 Akka 配置运行 Play App:
akka { remote.netty.tcp.port=2552 remote.netty.tcp.hostname=127.0.0.1 remote.enabled-transports = ["akka.remote.netty.tcp"] cluster { seed-nodes = [ "akka.tcp://Api@127.0.0.1:2552" ] auto-down-unreachable-after = 10s } extensions = ["akka.cluster.client.ClusterClientReceptionist"] loglevel = DEBUG actor { serializer { proto = "akka.remote.serialization.ProtobufSerializer" } serialization-bindings { "com.myproject.api.common.messages.IsValidClientMessage$IsValidClient" = proto } serialize-messages = on provider = "akka.cluster.ClusterActorRefProvider" debug { receive = on } } }
这是我一直试图向ApiServiceSystem发送消息的代码:
package com.myproject.api.akka.actors.socket
import ...
class ClientActor extends Actor with ActorLogging {
ClusterClientReceptionist(context.system).registerService(self)
val outActors: ArrayBuffer[ActorRef] = ArrayBuffer.empty
val apiServiceClient = context.system.actorOf(ClusterClient.props(
ClusterClientSettings(context.system).withInitialContacts(Set(ActorPath.fromString("akka.tcp://ApiServiceActorSystem@127.0.0.1:2560/system/receptionist")))
))
override def receive = {
case WatchOutActor(a) =>
context.watch(a)
outActors += a
case Terminated(a) =>
context.unwatch(a)
outActors.remove(outActors.indexOf(a))
case other =>
implicit val to: Timeout = 2 seconds
val isValidClient = IsValidClient.newBuilder() // Protocol Buffer Message
isValidClient.setClientId(1000)
isValidClient.setClientSecret("notsosecret")
(apiServiceClient ? ClusterClient.Send("/user/clientActor", isValidClient.build(), false)).mapTo[Future[Either[ServiceError, Boolean]]] map { f =>
f map {
case Left(e) =>
outActors foreach { a => a ! e.msg }
case Right(bool) =>
outActors foreach { a => a ! bool.toString }
}
} recover {
case e: Exception => println(s"-=> Exception ${e.getMessage}")
}
}
}
object ClientActor {
case class WatchOutActor(actorRef: ActorRef)
}
我从下面的日志中看到,我的 api 已连接到集群运行服务:
[DEBUG] [08/11/2016 10:11:05.936] [ApiServiceActorSystem-akka.remote.default-remote-dispatcher-24] [akka.remote.Remoting] Associated [akka.tcp://ApiServiceActorSystem@127.0.0.1:2560] <- [akka.tcp://Api@127.0.0.1:2552]
[DEBUG] [08/11/2016 10:11:05.998] [ApiServiceActorSystem-akka.remote.default-remote-dispatcher-24] [akka.serialization.Serialization(akka://ApiServiceActorSystem)] Using serializer[akka.cluster.client.protobuf.ClusterClientMessageSerializer] for message [akka.cluster.client.ClusterReceptionist$Internal$GetContacts$]
[DEBUG] [08/11/2016 10:11:06.000] [ApiServiceActorSystem-akka.actor.default-dispatcher-4] [akka.tcp://ApiServiceActorSystem@127.0.0.1:2560/system/receptionist] Client [akka.tcp://Api@127.0.0.1:2552/user/$a] gets contactPoints [akka.tcp://ApiServiceActorSystem@127.0.0.1:2560/system/receptionist] (all nodes)
[DEBUG] [08/11/2016 10:11:06.002] [ApiServiceActorSystem-akka.actor.default-dispatcher-4] [akka.tcp://ApiServiceActorSystem@127.0.0.1:2560/system/receptionist] Client [akka.tcp://Api@127.0.0.1:2552/user/$a] gets contactPoints [akka.tcp://ApiServiceActorSystem@127.0.0.1:2560/system/receptionist] (all nodes)
[DEBUG] [08/11/2016 10:11:06.002] [ApiServiceActorSystem-akka.actor.default-dispatcher-4] [akka.tcp://ApiServiceActorSystem@127.0.0.1:2560/system/receptionist] Client [akka.tcp://Api@127.0.0.1:2552/user/$a] gets contactPoints [akka.tcp://ApiServiceActorSystem@127.0.0.1:2560/system/receptionist] (all nodes)
[DEBUG] [08/11/2016 10:11:06.002] [ApiServiceActorSystem-akka.actor.default-dispatcher-4] [akka.tcp://ApiServiceActorSystem@127.0.0.1:2560/system/receptionist] Client [akka.tcp://Api@127.0.0.1:2552/user/$a] gets contactPoints [akka.tcp://ApiServiceActorSystem@127.0.0.1:2560/system/receptionist] (all nodes)
[DEBUG] [08/11/2016 10:11:06.004] [ApiServiceActorSystem-akka.remote.default-remote-dispatcher-76] [akka.serialization.Serialization(akka://ApiServiceActorSystem)] Using serializer[akka.cluster.client.protobuf.ClusterClientMessageSerializer] for message [akka.cluster.client.ClusterReceptionist$Internal$Contacts]
[DEBUG] [08/11/2016 10:11:06.033] [ApiServiceActorSystem-akka.remote.default-remote-dispatcher-76] [akka.serialization.Serialization(akka://ApiServiceActorSystem)] Using serializer[akka.serialization.JavaSerializer] for message [akka.actor.Identify]
[DEBUG] [08/11/2016 10:11:06.037] [ApiServiceActorSystem-akka.remote.default-remote-dispatcher-24] [akka.serialization.Serialization(akka://ApiServiceActorSystem)] Using serializer[akka.serialization.JavaSerializer] for message [akka.actor.ActorIdentity]
[DEBUG] [08/11/2016 10:11:06.126] [ApiServiceActorSystem-akka.remote.default-remote-dispatcher-76] [akka.serialization.Serialization(akka://ApiServiceActorSystem)] Using serializer[akka.serialization.JavaSerializer] for message [akka.remote.EndpointWriter$AckIdleCheckTimer$]
[DEBUG] [08/11/2016 10:11:07.749] [ApiServiceActorSystem-akka.remote.default-remote-dispatcher-76] [akka.serialization.Serialization(akka://ApiServiceActorSystem)] Using serializer[akka.cluster.client.protobuf.ClusterClientMessageSerializer] for message [akka.cluster.client.ClusterReceptionist$Internal$Heartbeat$]
但是每当我尝试发送消息时,我都会收到此错误:
java.lang.RuntimeException: Unable to find proto buffer class: com.myproject.api.common.messages.IsValidClientMessage$IsValidClient
at com.google.protobuf.GeneratedMessageLite$SerializedForm.readResolve(GeneratedMessageLite.java:1192)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1104)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1810)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at akka.serialization.JavaSerializer$$anonfun$1.apply(Serializer.scala:241)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at akka.serialization.JavaSerializer.fromBinary(Serializer.scala:241)
at akka.serialization.Serialization$$anonfun$deserialize$3.apply(Serialization.scala:142)
at scala.util.Try$.apply(Try.scala:192)
at akka.serialization.Serialization.deserialize(Serialization.scala:142)
at akka.actor.dungeon.Dispatch$class.sendMessage(Dispatch.scala:128)
at akka.actor.ActorCell.sendMessage(ActorCell.scala:374)
at akka.actor.Cell$class.sendMessage(ActorCell.scala:295)
at akka.actor.ActorCell.sendMessage(ActorCell.scala:374)
at akka.actor.RepointableActorRef.$bang(RepointableActorRef.scala:169)
at akka.actor.ActorRef.tell(ActorRef.scala:128)
at akka.pattern.AskableActorRef$.internalAsk$extension(AskSupport.scala:295)
at akka.pattern.AskableActorRef$.$qmark$extension1(AskSupport.scala:281)
at com.myproject.api.akka.actors.socket.ClientActor$$anonfun$receive$1.applyOrElse(ClientActor.scala:43)
at akka.actor.Actor$class.aroundReceive(Actor.scala:480)
at com.myproject.api.akka.actors.socket.ClientActor.aroundReceive(ClientActor.scala:16)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.ClassNotFoundException: com.myproject.api.common.messages.IsValidClientMessage$IsValidClient
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:264)
at com.google.protobuf.GeneratedMessageLite$SerializedForm.readResolve(GeneratedMessageLite.java:1183)
... 38 common frames omitted
如何序列化我的消息?为什么ClassNotFoundException 会在运行时出现?任何帮助将不胜感激
【问题讨论】:
-
您的代码中可能存在 2 个问题:1. 客户端和服务器中均不存在已编译的 protobuf,或者 2. 由于 java-scala 互操作性很棘手,您可以在其中包含案例类protobuf 未正确解析定义消息协议的对象。
-
你使用哪个 protobuf-java 库版本?
-
@frank_neff 它的 2.6.1
标签: scala akka playframework-2.5 akka-cluster