【问题标题】:Akka streams and transaction boundariesAkka 流和事务边界
【发布时间】:2017-03-26 14:24:43
【问题描述】:

我仍在掌握 Akka 流的概念,并试图了解当我们有一组需要以原子方式处理的项目时如何将它们映射到场景。假设我们有一个包含多个项目的采购订单,我们需要对每个项目应用一些处理,然后将它们合并为一个值。这样的工作流是否应该成为它自己的单独流(或子流),一旦采购订单被完全处理,它就关闭? IE。每个采购订单都会开始一个新的流程?或者我有一个永无止境的采购订单流?但如果是这样,我不会有混合来自不同订单的采购订单的问题吗?

换句话说,我想要实现的是处理不同工作流的隔离,并想知道 Akka 流是否可以很好地匹配它。

【问题讨论】:

    标签: transactions akka akka-stream reactive-streams


    【解决方案1】:

    直接回答您的问题:可以创建一个“对每个项目应用一些处理,然后将它们合并回一个值”的流。

    使用一些示例代码开发您的示例:

    case class Item(itemId : String)
    
    case class PurchaseOrder(orderId : String, items : Seq[Item])
    
    val purchaseOrder : PurschaseOrder = ???
    

    如果我们想用流处理项目,我们可以,尽管问题中减少的确切性质是模棱两可的,所以我不会定义折叠是如何实现的:

    type ProcessOutput = ???
    
    def processItem(item : Item) : ProcessOutput = ???
    
    val combinedResult : Future[CombinedResult] = 
      Source.fromIterator( purchaseOrder.items.toIterator )
            .via(Flow[Item] map processItem)
            .to(Sink.fold[ProcessOutput](???)(???) )
            .run()
    

    间接回答你的问题,

    首先考虑期货

    Akka 流在需要背压时非常有用。当您连接到外部数据源时,背压很常见,因为 bp 允许您的应用程序确定数据流向您的速度,因为您负责持续发出对更多数据的需求。

    在您在问题中提出的情况下,不需要广播需求,and incur the inherent overhead 这种通信需要。您已经拥有了一系列物品,因此没有人可以发送需求...

    相反,我认为 Futures 是解决您描述的案例的最佳方式:

    def futProcess(item : Item)(implicit ec : ExecutionContext) = 
      Future { processItem(item) } 
    
    // same output type as the stream run 
    val combinedResults : Future[CombinedResult] = 
      Future.sequence{ purchaseOrder.items map futProcess }
            .map{ _ fold[ProcessOutput](???)(???) }
    

    拥有一个完整的 ActorSystem 将获得更好的性能,更少的复杂性,以及与流完全相同的结果......

    【讨论】:

    • 感谢您的详细解答。我也明白,正如您所指出的,在某些情况下,流可能不是最理想的。但总的来说,我看到了流中的巨大潜力,并想一探究竟。
    猜你喜欢
    • 2017-05-22
    • 2018-05-23
    • 1970-01-01
    • 1970-01-01
    • 2015-08-18
    • 1970-01-01
    • 2019-04-18
    • 2016-06-05
    • 2017-10-16
    相关资源
    最近更新 更多