【问题标题】:Akka Cluster aware routers - share redis instance to all routeeAkka 集群感知路由器 - 将 redis 实例共享给所有路由
【发布时间】:2017-02-23 23:22:18
【问题描述】:

在 Akka 集群应用程序的上下文中,我遇到了关于 Akka 期望的一个属性的问题:每个 (cas) 类和使用的每个消息都必须是可序列化的。我有以下上下文:我想使用来自 redis 集群的数据,为此,我决定采用集群感知路由器池来添加节点以拥有更多工作人员。 Worker 从 redis 读取数据并将一些元数据存储在 mongodb 中。在第一个版本中,我这样做了:

object MasterWorkers {

  def props
  (  awsBucket : String,
     gapMinValueMicroSec : Long,
     persistentCache: RedisCache,
     mongoURI : String,
     mongoDBName : String,
     mongoCollectioName : String
  ) : Props =
    Props(MasterWorkers(awsBucket, gapMinValueMicroSec, persistentCache, mongoURI, mongoDBName, mongoCollectioName))

  case class JobRemove(deviceId: DeviceId, from : Timestamp, to : Timestamp)
}

case class MasterWorkers
(
  awsBucket : String,
  gapMinValueMicroSec : Long,
  persistentCache: RedisCache,
  mongoURI : String,
  mongoDBName : String,
  mongoCollectioName : String
) extends Actor with ActorLogging {

  val workerRouter =
    context.actorOf(FromConfig.props(Props(classOf[Worker],awsBucket,gapMinValueMicroSec, self, persistentCache, mongoURI, mongoDBName, mongoCollectioName)),
    name = "workerRouter")

工人阶级:

object Worker {

  def props
  (
    awsBucket : String,
    gapMinValueMicroSec : Long,
    replyTo : ActorRef,
    persistentCache: RedisCache,
    mongoURI : String,
    mongoDBName : String,
    mongoCollectioName : String
  ) : Props =
    Props(Worker(awsBucket, gapMinValueMicroSec, replyTo, persistentCache, mongoURI, mongoDBName, mongoCollectioName))

  case class JobDumpFailed(deviceId : DeviceId, from: Timestamp, to: Timestamp)
  case class JobDumpSuccess(deviceId : DeviceId, from: Timestamp, to: Timestamp)

  case class JobRemoveFailed(deviceId : DeviceId, from: Timestamp, to: Timestamp)
}

case class Worker
(
  awsBucket : String,
  gapMinValueMicroSec : Long,
  replyTo : ActorRef,
  persistentCache: RedisCache,
  mongoURI : String,
  mongoDBName : String,
  mongoCollectioName : String
) extends Actor with ActorLogging {

但是当我启动两个节点时,这会引发以下异常:

[info] akka.remote.MessageSerializer$SerializationException: Failed to serialize remote message [class akka.remote.DaemonMsgCreate] using serializer [class akka.remote.serialization.DaemonMsgCreateSerializer].
[info] at akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:61)
[info] at akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:895)
[info] at akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:895)
[info] at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
[info] at akka.remote.EndpointWriter.serializeMessage(Endpoint.scala:894)
[info] at akka.remote.EndpointWriter.writeSend(Endpoint.scala:786)
[info] at akka.remote.EndpointWriter$$anonfun$4.applyOrElse(Endpoint.scala:761)
[info] at akka.actor.Actor$class.aroundReceive(Actor.scala:497)
[info] at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:452)
[info] at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
[info] at akka.actor.ActorCell.invoke(ActorCell.scala:495)
[info] at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
[info] at akka.dispatch.Mailbox.run(Mailbox.scala:224)
[info] at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
[info] at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
[info] at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
[info] at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
[info] at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
[info] Caused by: java.io.NotSerializableException: akka.actor.ActorSystemImpl

redis 缓存是一个简单的案例类,带有一个实现如下接口的伴随对象:

object RedisCache { // some static functions }

case class RedisCache
(
  master : RedisServer,
  slaves : Seq[RedisServer]
)(implicit actorSystem : ActorSystem)
  extends PersistentCache[DeviceKey, BCPPackets] with LazyLogging {
// some code here
}

然后为了解决这个问题,我将 redisCache 移到了 worker 中,我没有将它交给主节点:

case class Worker
(
  awsBucket : String,
  gapMinValueMicroSec : Long,
  replyTo : ActorRef,
  mongoURI : String,
  mongoDBName : String,
  mongoCollectioName : String
) extends Actor with ActorLogging {

// redis cache here now 
val redisCache = ...

但是通过这样的设计,每个路由都会创建一个新的 redis 缓存实例,这不是预期的行为。我想要的是拥有我的 redis 缓存的一个实例,然后与我的所有路由共享,但是在集群应用程序的上下文中,这似乎是不可能的,所以我不知道这是设计失败还是缺少一些经验与阿卡。如果有人遇到类似的问题,我很乐意提出建议!

【问题讨论】:

    标签: scala akka akka-cluster


    【解决方案1】:

    问题是你的RedisCache 不是那么简单。它带有一个ActorSystem,它不能被序列化。

    我猜这是因为它包含来自 - 例如的 RedisClient 实例。 - rediscala 库,这些需要 ActorSystem

    您需要从参与者系统中抽象出来,并且只将 Redis 集群的基本细节(即 RedisServer 对象)传递给您的工作人员。

    然后工作人员将自己实例化RedisClient - 使用他们的context.system

    case class Worker
    (
      awsBucket : String,
      gapMinValueMicroSec : Long,
      replyTo : ActorRef,
      redisMaster: RedisServer,
      redisSlaves: Seq[RedisServer],
      mongoURI : String,
      mongoDBName : String,
      mongoCollectioName : String
    ) extends Actor with ActorLogging {
    
      val masterSlaveClient = ??? //create from the RedisServer details
    
    }
    

    这将允许每个工作人员建立自己与 redis 集群的连接。

    或者,如果您只想在您的主服务器中连接一次并将连接共享给您的工作人员,您需要传递嵌入您的连接的RedisClientActor(此处为source)。这是ActorRef,可以远程共享。

    这可以通过调用client.redisConnection获得。

    例如,工人可以围绕它构建一个ActorRequest

    case class Worker
        (
          awsBucket : String,
          gapMinValueMicroSec : Long,
          replyTo : ActorRef,
          redisConnection: ActorRef,
          mongoURI : String,
          mongoDBName : String,
          mongoCollectioName : String
        ) extends Actor with ActorLogging with ActorRequest {
    
          // you will need to implement the execution context that ActorRequest needs as well..
    
          send(redisCommand)
    
        }
    

    【讨论】:

    • 关于这个方案,是说每个worker都会实例化一个redis客户端?
    • 是的。如果您对此感兴趣,我刚刚添加了一种涉及共享一个连接的方法。
    • 是的,我没有想到其他选择。我会试一试,让你知道!
    • 只是,我worker的主要思想是“从redis中拉取数据,写入mongodb和S3”。如果我使用嵌入 redis 客户端的演员,我的印象是它不会像例外那样扩展:redis 演员将收到我的工人的所有获取请求。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-03-28
    • 1970-01-01
    • 2017-09-18
    • 2020-05-10
    • 1970-01-01
    相关资源
    最近更新 更多