【问题标题】:Could not send Protocol Buffer message from Akka Cluster Client无法从 Akka 集群客户端发送协议缓冲区消息
【发布时间】:2016-08-11 05:01:07
【问题描述】:

我正在尝试使用 Play Framework (Scala) 作为 Akka 集群客户端,将消息发送到另一个运行我的应用服务的 Akka 集群。

这是我所做的:

  1. 我使用 Protocol Buffer 在不同的模块中定义了消息,并在运行服务的项目和 Play 应用程序之间共享(使用 git 子模块)

    syntax = "proto2";
    option java_package = "com.myproject.api.common.messages";
    option java_outer_classname = "IsValidClientMessage";
    
    message IsValidClient {
     required int32 clientId = 1;
     required string clientSecret = 2;
    }
    
  2. 在 2560 端口启动服务

    akka {
    
      remote.netty.tcp.port=${?AKKA_REMOTE_PORT}
      remote.netty.tcp.hostname=127.0.0.1
    
      cluster {
    
        seed-nodes = [
          "akka.tcp://ApiServiceActorSystem@127.0.0.1:2560"
        ]
    
        auto-down-unreachable-after = 10s
      }
    
      extensions = ["akka.cluster.client.ClusterClientReceptionist"]
    
      loglevel = DEBUG
    
      actor {
    
        serializer {
          proto = "akka.remote.serialization.ProtobufSerializer"
        }
    
       serialization-bindings {
          "com.myproject.api.common.messages.IsValidClientMessage$IsValidClient" = proto
        }
    
        serialize-messages = on
    
        provider = "akka.cluster.ClusterActorRefProvider"
    
        debug {
          receive = on
        }
      }
    }
    
  3. 并使用以下 Akka 配置运行 Play App:

    akka {
    
      remote.netty.tcp.port=2552
      remote.netty.tcp.hostname=127.0.0.1
      remote.enabled-transports = ["akka.remote.netty.tcp"]
    
      cluster {
    
        seed-nodes = [
          "akka.tcp://Api@127.0.0.1:2552"
        ]
    
       auto-down-unreachable-after = 10s
      }
    
      extensions = ["akka.cluster.client.ClusterClientReceptionist"]
    
      loglevel = DEBUG
    
       actor {
    
         serializer {
           proto = "akka.remote.serialization.ProtobufSerializer"
         }
    
        serialization-bindings {
        "com.myproject.api.common.messages.IsValidClientMessage$IsValidClient" = proto
        }
    
        serialize-messages = on
    
        provider = "akka.cluster.ClusterActorRefProvider"
    
        debug {
          receive = on
        }
      }
    }
    

这是我一直试图向ApiServiceSystem发送消息的代码:

package com.myproject.api.akka.actors.socket

import ...

class ClientActor extends Actor with ActorLogging {

  ClusterClientReceptionist(context.system).registerService(self)

  val outActors: ArrayBuffer[ActorRef] = ArrayBuffer.empty
  val apiServiceClient = context.system.actorOf(ClusterClient.props(
    ClusterClientSettings(context.system).withInitialContacts(Set(ActorPath.fromString("akka.tcp://ApiServiceActorSystem@127.0.0.1:2560/system/receptionist")))
  ))

  override def receive = {
    case WatchOutActor(a) =>
      context.watch(a)
      outActors += a
    case Terminated(a) =>
      context.unwatch(a)
      outActors.remove(outActors.indexOf(a))
    case other =>

      implicit val to: Timeout = 2 seconds

      val isValidClient = IsValidClient.newBuilder() // Protocol Buffer Message

      isValidClient.setClientId(1000)
      isValidClient.setClientSecret("notsosecret")

      (apiServiceClient ? ClusterClient.Send("/user/clientActor", isValidClient.build(), false)).mapTo[Future[Either[ServiceError, Boolean]]] map { f =>
        f map {
          case Left(e) =>
            outActors foreach { a => a ! e.msg }
          case Right(bool) =>
            outActors foreach { a => a ! bool.toString }
        }
      } recover {
        case e: Exception => println(s"-=> Exception ${e.getMessage}")
      }
  }
}

object ClientActor {

  case class WatchOutActor(actorRef: ActorRef)
}

我从下面的日志中看到,我的 api 已连接到集群运行服务:

[DEBUG] [08/11/2016 10:11:05.936] [ApiServiceActorSystem-akka.remote.default-remote-dispatcher-24] [akka.remote.Remoting] Associated [akka.tcp://ApiServiceActorSystem@127.0.0.1:2560] <- [akka.tcp://Api@127.0.0.1:2552]
[DEBUG] [08/11/2016 10:11:05.998] [ApiServiceActorSystem-akka.remote.default-remote-dispatcher-24] [akka.serialization.Serialization(akka://ApiServiceActorSystem)] Using serializer[akka.cluster.client.protobuf.ClusterClientMessageSerializer] for message [akka.cluster.client.ClusterReceptionist$Internal$GetContacts$]
[DEBUG] [08/11/2016 10:11:06.000] [ApiServiceActorSystem-akka.actor.default-dispatcher-4] [akka.tcp://ApiServiceActorSystem@127.0.0.1:2560/system/receptionist] Client [akka.tcp://Api@127.0.0.1:2552/user/$a] gets contactPoints [akka.tcp://ApiServiceActorSystem@127.0.0.1:2560/system/receptionist] (all nodes)
[DEBUG] [08/11/2016 10:11:06.002] [ApiServiceActorSystem-akka.actor.default-dispatcher-4] [akka.tcp://ApiServiceActorSystem@127.0.0.1:2560/system/receptionist] Client [akka.tcp://Api@127.0.0.1:2552/user/$a] gets contactPoints [akka.tcp://ApiServiceActorSystem@127.0.0.1:2560/system/receptionist] (all nodes)
[DEBUG] [08/11/2016 10:11:06.002] [ApiServiceActorSystem-akka.actor.default-dispatcher-4] [akka.tcp://ApiServiceActorSystem@127.0.0.1:2560/system/receptionist] Client [akka.tcp://Api@127.0.0.1:2552/user/$a] gets contactPoints [akka.tcp://ApiServiceActorSystem@127.0.0.1:2560/system/receptionist] (all nodes)
[DEBUG] [08/11/2016 10:11:06.002] [ApiServiceActorSystem-akka.actor.default-dispatcher-4] [akka.tcp://ApiServiceActorSystem@127.0.0.1:2560/system/receptionist] Client [akka.tcp://Api@127.0.0.1:2552/user/$a] gets contactPoints [akka.tcp://ApiServiceActorSystem@127.0.0.1:2560/system/receptionist] (all nodes)
[DEBUG] [08/11/2016 10:11:06.004] [ApiServiceActorSystem-akka.remote.default-remote-dispatcher-76] [akka.serialization.Serialization(akka://ApiServiceActorSystem)] Using serializer[akka.cluster.client.protobuf.ClusterClientMessageSerializer] for message [akka.cluster.client.ClusterReceptionist$Internal$Contacts]
[DEBUG] [08/11/2016 10:11:06.033] [ApiServiceActorSystem-akka.remote.default-remote-dispatcher-76] [akka.serialization.Serialization(akka://ApiServiceActorSystem)] Using serializer[akka.serialization.JavaSerializer] for message [akka.actor.Identify]
[DEBUG] [08/11/2016 10:11:06.037] [ApiServiceActorSystem-akka.remote.default-remote-dispatcher-24] [akka.serialization.Serialization(akka://ApiServiceActorSystem)] Using serializer[akka.serialization.JavaSerializer] for message [akka.actor.ActorIdentity]
[DEBUG] [08/11/2016 10:11:06.126] [ApiServiceActorSystem-akka.remote.default-remote-dispatcher-76] [akka.serialization.Serialization(akka://ApiServiceActorSystem)] Using serializer[akka.serialization.JavaSerializer] for message [akka.remote.EndpointWriter$AckIdleCheckTimer$]
[DEBUG] [08/11/2016 10:11:07.749] [ApiServiceActorSystem-akka.remote.default-remote-dispatcher-76] [akka.serialization.Serialization(akka://ApiServiceActorSystem)] Using serializer[akka.cluster.client.protobuf.ClusterClientMessageSerializer] for message [akka.cluster.client.ClusterReceptionist$Internal$Heartbeat$]

但是每当我尝试发送消息时,我都会收到此错误:

java.lang.RuntimeException: Unable to find proto buffer class: com.myproject.api.common.messages.IsValidClientMessage$IsValidClient
  at com.google.protobuf.GeneratedMessageLite$SerializedForm.readResolve(GeneratedMessageLite.java:1192)
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:497)
  at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1104)
  at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1810)
  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
  at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
  at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
  at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
  at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
  at akka.serialization.JavaSerializer$$anonfun$1.apply(Serializer.scala:241)
  at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
  at akka.serialization.JavaSerializer.fromBinary(Serializer.scala:241)
  at akka.serialization.Serialization$$anonfun$deserialize$3.apply(Serialization.scala:142)
  at scala.util.Try$.apply(Try.scala:192)
  at akka.serialization.Serialization.deserialize(Serialization.scala:142)
  at akka.actor.dungeon.Dispatch$class.sendMessage(Dispatch.scala:128)
  at akka.actor.ActorCell.sendMessage(ActorCell.scala:374)
  at akka.actor.Cell$class.sendMessage(ActorCell.scala:295)
  at akka.actor.ActorCell.sendMessage(ActorCell.scala:374)
  at akka.actor.RepointableActorRef.$bang(RepointableActorRef.scala:169)
  at akka.actor.ActorRef.tell(ActorRef.scala:128)
  at akka.pattern.AskableActorRef$.internalAsk$extension(AskSupport.scala:295)
  at akka.pattern.AskableActorRef$.$qmark$extension1(AskSupport.scala:281)
  at com.myproject.api.akka.actors.socket.ClientActor$$anonfun$receive$1.applyOrElse(ClientActor.scala:43)
  at akka.actor.Actor$class.aroundReceive(Actor.scala:480)
  at com.myproject.api.akka.actors.socket.ClientActor.aroundReceive(ClientActor.scala:16)
  at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
  at akka.actor.ActorCell.invoke(ActorCell.scala:495)
  at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
  at akka.dispatch.Mailbox.run(Mailbox.scala:224)
  at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
  at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
  at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
  at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
  at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.ClassNotFoundException: com.myproject.api.common.messages.IsValidClientMessage$IsValidClient
  at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
  at java.lang.Class.forName0(Native Method)
  at java.lang.Class.forName(Class.java:264)
  at com.google.protobuf.GeneratedMessageLite$SerializedForm.readResolve(GeneratedMessageLite.java:1183)
  ... 38 common frames omitted

如何序列化我的消息?为什么ClassNotFoundException 会在运行时出现?任何帮助将不胜感激

【问题讨论】:

  • 您的代码中可能存在 2 个问题:1. 客户端和服务器中均不存在已编译的 protobuf,或者 2. 由于 java-scala 互操作性很棘手,您可以在其中包含案例类protobuf 未正确解析定义消息协议的对象。
  • 你使用哪个 protobuf-java 库版本?
  • @frank_neff 它的 2.6.1

标签: scala akka playframework-2.5 akka-cluster


【解决方案1】:

akka-remote 和 protobuf 3.x 生成的类似乎存在类加载问题(即使您使用 proto2 定义)。可以使用 protobuf 2.5 编译器而不是 3.x 来解决它:https://github.com/google/protobuf/releases/tag/v2.5.0

您必须自己编译它,因为没有预编译的二进制文件。只需前往下载的目录并运行以下命令:

./configure
make
make install

编译 v2.5 protoc 二进制文件后,使用它来重建您的 java 类并使用它们。将打开一个问题以在 akka 中获得 proto 3 支持。

澄清:不是protobuf-java版本,而是protoc二进制3.0生成的类

【讨论】:

  • Akka 2.4.x 在内部使用 Protobuf 2.5 的阴影版本,这不应干扰您自己的 Protobuf 3.x 依赖项。您是否在项目中添加了 3.x 运行时依赖项?
  • 我对 3.0.0 有运行时依赖。它不是 java lib,而是使用 protoc 编译器 v2 (works) v3 (ClassNotFoundException) 生成的类
  • 我也有同样的问题
【解决方案2】:

您需要将 Lite 和 V3 类显式绑定到您的原型序列化程序:

akka {
  loglevel = "INFO"

  actor {
    provider = "akka.remote.RemoteActorRefProvider"

    serializers {
      proto = "akka.remote.serialization.ProtobufSerializer" # or use your own..
    }

    serialization-bindings {
      "com.google.protobuf.Message" = proto
      "com.google.protobuf.GeneratedMessageLite" = proto
      "com.google.protobuf.GeneratedMessageV3" = proto
    }
  }

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2022-01-20
    相关资源
    最近更新 更多