【问题标题】:Akka Actor - wait for some time to expect a message, otherwise send a message outAkka Actor - 等待一段时间等待消息,否则发送消息
【发布时间】:2012-09-26 06:54:07
【问题描述】:

是否可以让Actor等待X秒来接收任何消息,如果收到消息,照常处理,否则发送消息给其他Actor(预先确定在构造函数)?

【问题讨论】:

    标签: scala akka actor


    【解决方案1】:

    有可能,看看Akka Actor "ask" and "Await" with TimeoutException。但请记住,在 Actor 内部进行阻塞是一个非常糟糕的主意,因为在此期间 Actor 无法处理任何其他消息。此外,它还阻塞了一个 Akka 处理线程。

    更好的方法是发送一条消息(一劳永逸)并使用Akka scheduler 安排一些超时事件。当响应到达时,取消该事件或设置一些标志,以便在响应确实准时到达时不会触发。

    【讨论】:

      【解决方案2】:

      是的,如果您想等待任何消息,您只需设置一个receiveTimeouthttp://doc.akka.io/docs/akka/current/scala/actors.html#receive-timeout

      (这里的文档有点误导,您也可以在每条消息后设置receiveTimeout)

      【讨论】:

        【解决方案3】:

        可能有点矫枉过正,但你可以看看Finite State Machine (FSM) trait。

        import akka._
        import actor._
        import util._
        import duration._
        import Impatient._
        
        object Impatient {
          sealed trait State
          case object WaitingForMessage extends State
          case object MessageReceived extends State
          case object TimeoutExpired extends State
        
          sealed trait Data
          case object Unitialized extends Data
        
          // In
          case object Message
        }
        
        class Impatient(receiver: ActorRef) extends Actor with FSM[State, Data] {
          startWith(WaitingForMessage, Unitialized)
        
          when(WaitingForMessage, stateTimeout = 3 seconds) {
            case Event(StateTimeout, data) => goto(TimeoutExpired) using data // data is usually modified here
            case Event(Message, data) => goto(MessageReceived) using data // data is usually modified here
          }
        
          onTransition {
            case WaitingForMessage -> MessageReceived => stateData match {
              case data => log.info("Received message: " + data)
            }
            case WaitingForMessage -> TimeoutExpired => receiver ! TimeoutExpired
          }
        
          when(MessageReceived) {
            case _ => stay
          }
        
          when(TimeoutExpired) {
            case _ => stay
          }
        
          initialize
        }
        

        它在行动:

        object Main extends App {
          import akka._
          import actor._
          import Impatient._
        
          val system = ActorSystem("System")
        
          val receiver = system.actorOf(Props(new Actor with ActorLogging {
            def receive = {
              case TimeoutExpired => log.warning("Timeout expired")
            }
          }))
        
          val impatient = system.actorOf(Props(new Impatient(receiver)), name = "Impatient")
          impatient ! Message
        
          val impatient2 = system.actorOf(Props(new Impatient(receiver)), name = "Impatient2")
          Thread.sleep(4000)
          impatient2 ! Message
        
          system.shutdown()
        }
        

        【讨论】:

          猜你喜欢
          • 1970-01-01
          • 2013-12-07
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 2021-12-27
          • 1970-01-01
          • 2020-07-02
          • 1970-01-01
          相关资源
          最近更新 更多