【问题标题】:Parallelism in Akka streams in comparison with Akka与 Akka 相比,Akka 流中的并行性
【发布时间】:2018-11-16 06:36:47
【问题描述】:

我一直在尝试探索更多关于 akka 流的信息,但我无法理解我们如何以使用 Akka 实现的方式实现类似的并行性。假设 Actor A 使用来自 kafka 的数据并将其写入 s3 和另一个Actor B 从 kafka 消费并将其写入 postgres,另一个 Actor C 从 DB 读取并生成另一个 kafka 主题。所有 3 个参与者都可以在不同的参与者系统中,并且不需要依赖于其他参与者。但是我如何使用 Akka 流来实现类似的事情。我相信 akka 流具有 A 执行某些操作并将其通过管道传输到 B 的阶段,依此类推,直到我们到达水槽。我确实意识到有一个 mapAsync 可以用来并行化事物,但我不确定它在这种情况下会如何发挥作用,以及在订购担保方面。

【问题讨论】:

    标签: akka akka-stream


    【解决方案1】:

    单一来源

    对于您列出的特定用例,您可以使用BroadcastHub 将每个数据项从kafka“扇出”到您列出的每个Sink 值:

    type Data = ???
    
    val kafkaSource : Source[Data, _] = ???
    
    val runnableGraph: RunnableGraph[Source[Data, NotUsed]] =
      kafkaSource.toMat(BroadcastHub.sink(bufferSize = 256))(Keep.right)
    
    val kafkaHub : Source[Data, NotUsed] = runnableGraph.run()
    
    val s3Sink : Sink[Data, _] = ???
    
    val postgresSink : Sink[Data, _] = ???
    
    kafkaHub.to(s3Sink).run()
    kafkaHub.to(postgresSink).run()
    

    多个来源

    上述实现的一个重要缺点是“生产者的速率将自动适应最慢的消费者”。

    因此,如果您能够与最终源建立多个连接,那么通过最大化并发性可能会提高性能:

    val kafkaSource : () => Source[Data,_] = ???
    
    //stream 1
    kafkaSource().to(s3Sink).run()
    
    //stream 2
    kafkaSource().to(postgresSink).run()
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-10-25
      • 2017-01-30
      • 2011-05-28
      • 2018-04-28
      相关资源
      最近更新 更多