【问题标题】:Problems with Akka Streams' GraphStageAkka Streams 的 GraphStage 的问题
【发布时间】:2017-03-26 06:58:11
【问题描述】:

我需要编写一个 GraphStage,但遇到了一些问题。 我已经将代码压缩到下面,希望你们能对我有所了解。

下面的示例代码不是我的真实用例,它只是为了证明我的观点。希望这是我对 akka 流的理解,而不是它的局限性。

示例代码使用 WrapFlowShape 构建一个 Graph,并且基本上将图形的“输入”重定向到附加流的输入,并将图形的“输出”重定向到流的输出。

import akka.actor.ActorSystem
import akka.stream._
import akka.stream.javadsl.RunnableGraph
import akka.stream.scaladsl.{Flow, GraphDSL, Sink, Source}
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}

import scala.collection.immutable
import scala.io.StdIn

object WrapFlowSandbox extends App {
  case class WrapFlowShape[I, O](
      in: Inlet[I],
      out: Outlet[O],
      flowIn: Inlet[O],
      flowOut: Outlet[I]) extends Shape {
    val inlets: immutable.Seq[Inlet[_]] = in :: flowIn :: Nil
    val outlets: immutable.Seq[Outlet[_]] = out :: flowOut :: Nil
    def deepCopy = WrapFlowShape(in.carbonCopy, out.carbonCopy, flowIn.carbonCopy, flowOut.carbonCopy)
  }
  class WrapFlow[I, O] extends GraphStage[WrapFlowShape[I, O]] {
    val in: Inlet[I] = Inlet[I]("WrapFlow.in")
    val out: Outlet[O] = Outlet[O]("WrapFlow.out")
    val flowIn: Inlet[O] = Inlet[O](s"Select.flowIn")
    val flowOut: Outlet[I] = Outlet[I](s"Select.flowOut")
    val shape: WrapFlowShape[I, O] = WrapFlowShape(in, out, flowIn, flowOut)
    def createLogic(initialAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
      var inElem: I = _
      setHandler(in, new InHandler {
        def onPush = {
          println("2 in.onPush")
          inElem = grab(in)
          pull(flowIn)
        }
      })
      setHandler(out, new OutHandler {
        def onPull = {
          println("1 out.onPull")
          pull(in)
        }
      })
      setHandler(flowIn, new InHandler {
        def onPush = {
          println("4 flowIn.onPush")
          val outElem = grab(flowIn)
          push(out, outElem)
        }
      })
      setHandler(flowOut, new OutHandler {
        def onPull = {
          println("3 flowOut.onPull")
          push(flowOut, inElem)
        }
      })
    }
  }
  implicit val system = ActorSystem()
  implicit val materializer = ActorMaterializer()
  val flow = Flow[Int].map(_ + 1)
  RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
    import GraphDSL.Implicits._
    val select = b.add(new WrapFlow[Int, Int])
    Source.single(1) ~> select.in
    select.out ~> Sink.foreach[Int](r => println(s"result = $r"))
    select.flowOut ~> flow ~> select.flowIn
    ClosedShape
  }).run(materializer)
  StdIn.readLine
  system.terminate
}

我希望看到的输出是:

1 out.onPull
2 in.onPush
3 flowOut.onPull
4 flowIn.onPush
result = 2

但实际输出只是前 3 行:

1 out.onPull
2 in.onPush
3 flowOut.onPull

永远不会调用“flowIn”的 InHandler.onPush()。

我知道以这种方式编写 GraphStage 是非常规的,但我确实需要它。

让我感到困惑的是,我通过在第 2 步中拉动附加流(pull(flowIn))产生了对附加流的需求, 并且附加的流反过来在步骤 3 中产生了对“flowOut”的需求。

但在步骤 3 中通过 flowOut 推送元素后,该元素从未被推送,因此从未执行步骤 4。

这是为什么呢?

如果附加流感知到下游需求并在第 3 步生成上游需求,为什么在第 3 步推送的元素没有通过附加流?

【问题讨论】:

  • 顺便说一句。你的形状已经在akka中定义为BidiShape

标签: scala akka akka-stream


【解决方案1】:

不确定我是否遵循您的处理程序中的逻辑。根据我对您的GraphDSL.create() 内容的理解,我将它们修改为以下内容:

def createLogic(initialAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
  var inElem: I = _
  setHandler(in, new InHandler {
    def onPush = {
      println("in.onPush")
      inElem = grab(in)
      push(flowOut, inElem)
    }
  })
  setHandler(out, new OutHandler {
    def onPull = {
      println("out.onPull")
      pull(flowIn)
    }
  })
  setHandler(flowIn, new InHandler {
    def onPush = {
      println("flowIn.onPush")
      val outElem = grab(flowIn)
      push(out, outElem)
    }
  })
  setHandler(flowOut, new OutHandler {
    def onPull = {
      println("flowOut.onPull")
      pull(in)
    }
  })
}

执行它应该产生以下输出:

out.onPull
flowOut.onPull
in.onPush
flowIn.onPush
result = 2

注意到 copyFromPorts() 方法没有在您的 WrapFlowShape 案例类(它不是抽象类)中被覆盖。我相信您需要使用以下内容覆盖它:

override def copyFromPorts(
    inlets: immutable.Seq[Inlet[_]],
    outlets: immutable.Seq[Outlet[_]]) = {
  WrapFlowShape[I, O](
    inlets(0).as[I],
    outlets(0).as[O],
    inlets(1).as[O],
    outlets(1).as[I])
}

【讨论】:

  • 感谢您的回答。它确实有效,但我的用例比这要复杂一些。我简化了它,但实际上,有多个 flowIns 和 flowOuts。在我知道要拉哪些 flowIns 之前,我需要得到 inElem。所以我不能把它们都拉出来。onPull。
  • 顺便说一句,由于 Shape 是一个抽象类,而 deepCopy 也是抽象类,我不需要覆盖。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2017-12-14
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2019-04-13
  • 1970-01-01
相关资源
最近更新 更多