【问题标题】:Partitioning on SubType for akka streams对 akka 流的 SubType 进行分区
【发布时间】:2019-01-18 06:22:58
【问题描述】:

我有一个 akka 流,其中有一个 ADT 形式。

sealed trait Message
sealed trait ThisMessage extends Message
sealed trait ThatMessage extends Message

现在我有一个这个消息处理程序流和一个那个消息处理程序流。我有一个接受消息类型的入口流。

为了创建拆分,我有以下分区程序。我对分区函数有以下定义。

 /**
  * Creates a Partition stage that, given a type A, makes a decision to whether to partition to subtype B or subtype C
  *
  * @tparam A type of input
  * @tparam B type of output on the first outlet.
  * @tparam C type of output on the second outlet.
  *
  * @return A partition stage
  */
  def binaryPartitionByType[A, B <: A, C <: A](): Graph[FanOutShape2[A, B, C], NotUsed] =
GraphDSL.create[FanOutShape2[A, B, C]]() { implicit builder =>
  import GraphDSL.Implicits._

  // This is wrong, but I have no idea how to write this.
  val partitioner: UniformFanOutShape[A, A] = builder.add(Partition[A](2, {
    case _: B => 0
    case _: C => 1
  }))

  new FanOutShape2(partitioner.in, partitioner.out(0).outlet, partitioner.out(1).outlet)
}

我希望使用上述方法,并使用类型参数中的 ADT 来初始化分区器。

编译器会抛出此错误。

Error:(63, 7) type mismatch;
 found   : akka.stream.FanOutShape2[A,A,A]
 required: akka.stream.FanOutShape2[A,B,C]
      new FanOutShape2(partitioner.in, partitioner.out(0).outlet, 
partitioner.out(1).outlet)

据我了解,分区对象只有 Inlet(在本例中为 A,参数化类型。

有人知道我该如何解决这个问题吗?

【问题讨论】:

    标签: scala akka akka-stream


    【解决方案1】:

    这是从UniformFanOutShape[A, A] 生成builder.add(Partition[A]()) 实例化FanOutShape2[A, B&lt;:A, C&lt;:A] 的一种方法:

    import akka.stream.scaladsl._
    import akka.stream.{Graph, FanOutShape2}
    import akka.NotUsed
    import scala.reflect.ClassTag
    
    def binaryPartitionByType[A, B <: A : ClassTag, C <: A : ClassTag](): Graph[FanOutShape2[A, B, C], NotUsed] =
      GraphDSL.create[FanOutShape2[A, B, C]]() { implicit builder =>
        import GraphDSL.Implicits._
    
        val partitioner = builder.add(Partition[A](2, {
          case _: B => 0
          case _: C => 1
        }))
    
        val partitionB = builder.add(Flow[A].collect{ case b: B => b })
        val partitionC = builder.add(Flow[A].collect{ case c: C => c })
    
        partitioner.out(0) ~> partitionB
        partitioner.out(1) ~> partitionC
    
        new FanOutShape2(partitioner.in, partitionB.out, partitionC.out)
    }
    
    // binaryPartitionByType: [A, B <: A, C <: A]()(
    //   implicit evidence$1: scala.reflect.ClassTag[B], implicit evidence$2: scala.reflect.ClassTag[C]
    // ) akka.stream.Graph[akka.stream.FanOutShape2[A,B,C],akka.NotUsed]
    

    请注意,需要ClassTag 以避免类型擦除。

    【讨论】:

    • 很酷,谢谢,这似乎有效。好奇你为什么不把它写成 partitioner.out1 ~> partitionB
    • 我最近才看到这个错误,[java] 线程“main”中的异常 java.lang.IllegalStateException: Illegal GraphDSL usage。入口 [Collect.in] 未以结果形状返回且未连接。出口 [Partition.out1, Partition.out0] 未以结果形状返回且未连接。我认为我们确实需要您放弃的分区连接。
    • @Arunav Sanyal,感谢您报告问题。没错,任何未明确连接的已定义形状都会触发报告的Illegal GraphDSL usage 错误。示例代码已还原。
    【解决方案2】:

    问题是您正试图颠覆类型系统。 UniformFanOutShape 被命名为“uniform”,因为它的所有输出都是相同的类型。如果不是这样,您首先不需要创建额外的FanOutShape2。如果您要破坏类型系统,则应始终如一地执行此操作,以便更改Outlets 的类型。试试这样的:

    new FanOutShape2(partitioner.in, partitioner.out(0).outlet.as[B], partitioner.out(1).outlet.as[C])
    

    【讨论】:

    • 我无意颠覆类型系统。我真的不知道如何编写分区程序,我正在尝试。我是否必须在这里注入新的流,例如 Flow[A, B] 和 Flow[A, C] 之类的?
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2020-12-06
    • 2019-04-30
    • 2017-09-22
    • 2015-04-06
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多