【问题标题】:Creating an Akka stream for parallel processing of collection elements创建用于并行处理集合元素的 Akka 流
【发布时间】:2017-02-28 06:33:22
【问题描述】:

我正在尝试为包含并行处理流的 Akka 流定义一个图(我正在使用 Akka.NET,但这应该没关系)。想象一个订单的数据源,每个订单由一个订单 ID 和一个产品列表(订单项)组成。工作流程如下:

  1. 接收和订购
  2. 将订单广播到两个流,流 A 处理订单商品,通道 B 处理订单 ID(一些记账工作)
  3. 流程 A:将订单项目集合拆分为单独的元素,每个元素单独处理
  4. 流程 A:对于上一步中拆分产生的每个订单商品,调用一些外部服务来查找额外信息(价格、可用性等)
  5. 流程 B:为给定的订单 ID 做一些额外的记账
  6. 合并流 A 和 B
  7. 将上一步中的合并数据发送到接收器,从而丰富订单信息

步骤 1(Source.From)、2(广播)、4-5(地图)、6(合并)、7(接收器)看起来没问题。但是如何在 Akka 或响应式流术语中实现集合拆分?这不是广播或扁平化,需要将 N 个元素的集合拆分为 N 个独立的子流,这些子流稍后将被合并回来。这是如何实现的?

【问题讨论】:

    标签: akka akka-stream


    【解决方案1】:

    我建议一次性完成。我知道两个流程看起来更酷,但相信我,就设计的简单性而言,这是不值得的(我试过了)。你可以写这样的东西

    import akka.stream.scaladsl.{Flow, Sink, Source, SubFlow}
    
    import scala.collection.immutable
    import scala.concurrent.Future
    
    case class Item()
    
    case class Order(items: List[Item])
    
    val flow = Flow[Order]
      .mapAsync(4) { order =>
        Future {
          // Enrich your order here
          order
        }
      }
      .mapConcat { order =>
        order.items.map(order -> _)
      }
      .mapAsync(4) { case (order, item) =>
        Future {
          // Enrich your item here
          order -> item
        }
      }
      .groupBy(2, tuple => tuple._1)
      .fold[Map[Order, List[Item]]](immutable.Map.empty) { case (map, (order, item)) => map.updated(order, map.getOrElse(order, Nil) :+ item) }
      .mapConcat { _.map { case (order, newItems) => order.copy(items = newItems)} }
    

    但即使是这种方法也很糟糕。上面的代码或您的设计可能会出现很多问题。如果某一订单商品的丰富化失败,您会怎么做?如果订单对象的丰富失败怎么办?您的信息流应该如何处理?

    如果我是你,我会拥有 Flow[Order] 并在 mapAsync 中处理它的孩子,所以至少它可以保证我没有部分处理的订单。

    【讨论】:

    • 非常感谢您的回答。是的,mapAsync 将提供高效且可预测的工作流程。但我不确定我是否理解分叉的危险。是的,可能会出现问题,但如果其中一个并发分支发生错误,我不能以类似的方式处理错误吗?
    • @VagifAbilov 您将能够处理它,但我认为它需要更多代码来处理边缘情况和错误。更多代码 -> 更难维护。在使用具有奇特拓扑的流构建系统后,我意识到这通常不值得(以我的拙见)。
    • 我在另一个关于流的讨论中收到了类似的解释。有道理,谢谢你的澄清。
    猜你喜欢
    • 1970-01-01
    • 2016-07-06
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-11-19
    • 2014-02-18
    • 1970-01-01
    相关资源
    最近更新 更多