4、Akka Actor 

 

4.1、Akka 概述 

Akka 基于 Actor 模型,提供了一个用于构建可扩展的(Scalable)、弹性的(Resilient)、快 速响应的(Responsive)应用程序的平台。 
 
Actor 模型:在计算机科学领域,Actor 模型是一个并行计算(Concurrent Computation)模型, 它把 actor 作为并行计算的基本元素来对待:为响应一个接收到的消息,一个 actor 能够自 己做出一些决策,如创建更多的 actor,或发送更多的消息,或者确定如何去响应接收到的 下一个消息。 

【图文详细 】Scala——Akka Actor

Actor 是 Akka 中最核心的概念,它是一个封装了状态和行为的对象,Actor 之间可以通过交 换消息的方式进行通信,每个 Actor 都有自己的收件箱(Mailbox)。通过 Actor 能够简化锁 及线程管理,可以非常容易地开发出正确地并发程序和并行系统。 
 
Actor 具有如下特性:

1、提供了一种高级抽象,能够简化在并发(Concurrency)/并行(Parallelism)应用场景下 的编程开发

2、提供了异步非阻塞的、高性能的事件驱动编程模型

3、超级轻量级事件处理(每 GB 堆内存几百万 Actor) 

 

4.2、重要 API 介绍 


4.2.1、ActorSystem 
在 Akka 中,ActorSystem 是一个重量级的结构,他需要分配多个线程,所以在实际应用中, ActorSystem 通常是一个单例对象,我们可以使用这个 ActorSystem 的 actorOf 方法创建很多 Actor。 

4.2.2、Actor 
在 Akka 中,Actor 负责通信,在 Actor 中有一些重要的生命周期方法。 
1、preStart()方法:该方法在 Actor 对象构造方法执行后执行,整个 Actor 生命周期中仅执行 一次。

2、receive()方法:该方法在 Actor 的 preStart 方法执行完成后执行,用于接收消息,会被反 复执行。 

4.2.3、ActorSystem 和 Actor 对比 
Actor: 就是用来做消息传递的 用来接收和发送消息的,一个 Actor 就相当于是一个老师或者是学生。 如果我们想要多个老师,或者学生,就需要创建多个 Actor 实例。 
ActorSystem: 用来创建和管理 Actor,并且还需要监控 Actor。ActorSystem 是单例的(object) 在同一个进程里面,只需要一个 ActorSystem 就可以了 

4.3、利用 Akka 构建 RPC 应用案例 

4.3.1、需求 
目前大多数的分布式架构底层通信都是通过 RPC 实现的,RPC 框架非常多,比如前我们学过 的 Hadoop 项目的 RPC 通信框架,但是 Hadoop 在设计之初就是为了运行长达数小时的批量 而设计的,在某些极端的情况下,任务提交的延迟很高,所有 Hadoop 的 RPC 显得有些笨重。 
Spark 的 RPC 是通过 Akka 类库实现的,Akka 用 Scala 语言开发,基于 Actor 并发模型实现, Akka 具有高可靠、高性能、可扩展等特点,使用 Akka 可以轻松实现分布式 RPC 功能。 

4.3.2、应用架构 

【图文详细 】Scala——Akka Actor

4.2.3、具体实现 
Master 代码实现: 

class Master extends  Actor{ 
  def doHello(): Unit ={     println("我是 Master, 我接收到了 Worker 的 hello 的消息"); 
  } 
  /** 
    * 其实就是一个死循环 : 接收消息
    * while(true) 
    */ 
  override def receive: Receive = { 
case "hello" =>{ 
      doHello() 
      //sender()   谁发送过来消息这个就是谁
       //sender()   ! "hi"  给 sender() 发送一个 hi的消息 
      sender() ! "hi" 
    } 
  } 
} 
 
object Master { 
 
  def main(args: Array[String]): Unit = { 
    val str= 
      """ 
        |akka.actor.provider = "akka.remote.RemoteActorRefProvider" 
        |akka.remote.netty.tcp.hostname = localhost 
        |akka.remote.netty.tcp.port = 6790 
      """.stripMargin 
 
    val conf: Config = ConfigFactory.parseString(str) 
    // def apply(name: String, config: Config) 
    val actorSystem = ActorSystem("MasterActorSystem", conf) 
    // 创建并启动 actor   def actorOf(props: Props, name: String): ActorRef 
    //new Master() 会导致主构造函数会运行!!
     actorSystem.actorOf(Props(new Master()), "MasterActor") 
  } 
}

Worker 代码实现: 

class Worker extends Actor{// 生命周期
   def doHi(): Unit ={
     println("我是 Worker,我接收到了 Master 的 hi 的消息"); 
  } 
   // 如果 actor一执行首先运行的是这个方法,只运行一次。 
  override def preStart(): Unit = { 
 // 实现的是给 Master 发送消息  地址
     val workerActor = 
context.actorSelection("akka.tcp://[email protected]:6790/user/MasterActo
r") 
    workerActor ! "hello" 
  } 
 
  override def receive: Receive = { 
    case "hi"  => { 
      doHi() 
    } 
  } 
} 
object Worker { 
  def main(args: Array[String]): Unit = { 
 
    val str= 
      """ 
        |akka.actor.provider = "akka.remote.RemoteActorRefProvider" 
        |akka.remote.netty.tcp.hostname = localhost 
      """.stripMargin 
 
    val conf = ConfigFactory.parseString(str) 
    val actorSystem = ActorSystem("WorkerActorSystem", conf) 
    actorSystem.actorOf(Props(new Worker()), "WorkerActor") 
  } 
}

 

4.2.4、执行测试 
先启动 Master,再启动 Worker 

【图文详细 】Scala——Akka Actor

分类:

技术点:

相关文章: