【问题标题】:Akka Streams: Calling push(_,_) not within onPull(_,_) is blocking the stream - Why?Akka Streams:不在 onPull(_,_) 内调用 push(_,_) 会阻塞流 - 为什么?
【发布时间】:2018-03-22 21:56:17
【问题描述】:

我无法理解小样本客户 Akka Streams Source 的行为。

示例背后的想法是,Source 应该向 Actor 询问下一个元素。请看下面的代码

class ActorSource[T](context: ActorRefFactory, actor: ActorRef) extends GraphStage[SourceShape[T]] {

  val out: Outlet[T] = Outlet("actor-source")

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = {
    new GraphStageLogic(shape) {
      setHandler(out, new OutHandler {
        val receivingActor = context.actorOf(Props(new ReceivingActor(msg => {
          push(out, msg)
          println("push - Done")
        })))

        override def onPull(): Unit = {
          actor ! Protocol.Pull(receivingActor)
          println("onPull - Done")
        }
      })
    }
  }

  override def shape: SourceShape[T] = SourceShape(out)

  /**
    * A small actor which receives new elements from the actual source actor.
    *
    * @param push The method to push elements into the stream
    */
  class ReceivingActor(push: T => Unit) extends Actor with ActorLogging with UnknownMessage {

    override def receive: Receive = {
      case Protocol.Push(msg) =>
        push(msg.asInstanceOf[T])  // I know that this is evil ....just for test in that case...
      case msg =>
        unknownMessage(msg)
    }

  }

}

object ActorSource {

  /**
    * Creates an [[ActorSource]]
    *
    * @param actor   The actor which acts as the source
    * @param context The context to create the internal helper actor
    * @return A new akka-streams source
    */
  def Source[T](actor: ActorRef)(implicit context: ActorRefFactory): AkkaSource[T, NotUsed] = {
    val graph: Graph[SourceShape[T], NotUsed] = new ActorSource[T](context, actor)
    AkkaSource.fromGraph(graph)
  }

  /**
    * Defines the messages/ events for the source actor
    */
  object Protocol {

    /**
      * Will be sent if the stream requires new elements.
      *
      * @param actor The actor which should receive the push message
      */
    case class Pull(actor: ActorRef)

    /**
      * Sent by the source actor to submit a new element.
      *
      * @param msg The message to put into the stream.
      */
    case class Push(msg: Any)

  }

}

如果您使用该来源创建一个流,如下所示:

class SampleActor extends Actor with ActorLogging with UnknownMessage {

  var counter = 0

  override def receive: Receive = {
    case msg @ Protocol.Pull(actor) =>
      actor ! Protocol.Push(counter)
      counter = counter + 1
  }

}

val sourceActor = system.actorOf(Props(new SampleActor()))

val stream = ActorSource
  .Source[Int](sourceActor)(system)
  .take(10)
  .runWith(Sink.foreach(println))

Await.result(stream, 30 seconds)

输出只有如下:

onPull - Done
push - Done

第一个整数永远不会到达 Sink,并且不再调用 onPull。有趣的是,如果我终止程序,第一个整数会打印在 Sink 中。

我想知道这是功能还是错误?据我了解,在pull 发出插座已打开的信号后,任何时候都可以拨打push(_, _),即使我要求isAvailable,它也会返回true

谁能解释这种行为?

【问题讨论】:

  • 请问您为什么要实施这个?它试图复制一个“不太好”的设计理念,将所有 RS 语义作为原始参与者暴露给最终用户;这是在 Akka 中实现的,现在已被弃用。您可以使用 Source.actorRef 或队列源来解决您在此处尝试构建的问题;
  • 我正在寻找一种将我们已经在actor中实现的爬虫/摄取逻辑集成到Akka Streams中的方法。我不喜欢 Sorce.actorRef 的是,它不支持背压,当然 Source.queue 可能是另一种选择;但我仍然认为这个问题是有效的,我假设如果我调用我在 onPull 中开始的 Future 的 push onCompletion ,也会出现同样的行为;因此用例可能不是最好的,行为仍然很奇怪......

标签: scala akka akka-stream


【解决方案1】:

在阅读更多文档 (RTFM ;)) 后,我想我找到了问题的答案。

多处提到,在相关回调之外调用这些 API 方法是不安全的。为了实现我将要实现的,Akka Streams 提供了一个getAsyncCallback 方法。

更多细节可以在这里找到:https://doc.akka.io/docs/akka/2.5.4/scala/stream/stream-customize.html#using-asynchronous-side-channels

【讨论】:

    猜你喜欢
    • 2021-10-19
    • 1970-01-01
    • 2020-02-20
    • 1970-01-01
    • 1970-01-01
    • 2011-10-16
    • 2019-08-08
    • 2012-08-23
    • 1970-01-01
    相关资源
    最近更新 更多