【问题标题】:akka stream asyncBoundary vs mapAsyncakka 流 asyncBoundary 与 mapAsync
【发布时间】:2018-04-28 07:42:39
【问题描述】:

我试图了解asyncBoundarymapAsync 之间的区别。乍一看,我想它们应该是一样的。但是,当我运行代码时,看起来asyncBoundary 的性能比mapAsync

这里是代码

implicit val system = ActorSystem("sourceDemo")
implicit val materializer = ActorMaterializer()


Source(1 to 100).mapAsync(100)(t => Future {t + 1}).mapAsync(100)(t => Future {t * 2}).map(println).to(Sink.ignore).run()
Source(1 to 100).map(_ + 1).withAttributes(Attributes.asyncBoundary).map(_ * 2).map(t => println("async boundary", t)).to(Sink.ignore).run()

输出: 异步边界总是比 mayAsync 更快地完成。

从关于 asyncBoundary (https://doc.akka.io/docs/akka-stream-and-http-experimental/current/scala/stream-flows-and-basics.html) 的文档中,我可以看到它在不同的 CPU 上运行,但 mapAsync 是使用 Future 实现的多线程。 Future 也是异步的。

我可以要求更多关于这两个 API 的说明吗?

【问题讨论】:

    标签: scala akka akka-stream stream-processing


    【解决方案1】:

    异步

    正如您正确指出的那样,这会强制在两个阶段之间插入异步边界。在你的例子中

    Source(1 to 100).map(_ + 1).withAttributes(Attributes.asyncBoundary).map(_ * 2).map(t => println("async boundary", t)).to(Sink.ignore).run()
    

    这实际上意味着+ 1 操作和* 2 操作将由不同的参与者运行。这启用了流水线,当一个元素移动到* 2 阶段时,同时可以为+ 1 阶段引入另一个元素。如果您在此处强制设置异步边界,则同一参与者将按顺序执行操作并在一个元素上执行操作,然后再从上游请求一个新元素。

    顺便说一句,您的示例可以使用 async 组合器以更短的格式重写:

    Source(1 to 100).map(_ + 1).async.map(_ * 2).map(t => println("async boundary", t)).to(Sink.ignore).run()
    

    ma​​pAsync

    这是一个并行执行异步操作的阶段。并行度因子允许您指定要启动以服务传入元素的并行参与者的最大数量。并行计算的结果由mapAsync 阶段按顺序跟踪和发出。

    在你的例子中

    Source(1 to 100).mapAsync(100)(t => Future {t + 1}).mapAsync(100)(t => Future {t * 2}).map(println).to(Sink.ignore).run()
    

    可能多达 100 个+ 1 操作(即所有操作)可以并行运行,并按顺序收集结果。随后,最多可以并行运行 100 个* 2 操作,并再次按顺序收集结果并发送到下游。

    在您的示例中,您正在运行受 CPU 限制的快速操作,这些操作无法证明使用 mapAsync 是合理的,因为此阶段所需的基础架构很可能比并行运行 100 个这些操作的优势要昂贵得多。 mapAsync 在处理 IO 密集型、缓慢的操作时特别有用,其中并行化非常方便。

    如需全面了解此主题,请查看this blogpost

    【讨论】:

    • 我还要指出,当我们不关心顺序时使用.mapAsyncUnordered 可以提高性能。
    • 嗨 Stefano,感谢您的回复。我真的很喜欢你描述异步边界的方式。 This enables pipelining, as whilst an element moves on to the * 2 stage, at the same time another element can be brought in for the + 1 stage. 但是,当我们谈论 mapAsync 时,我猜每个阶段的所有元素都是并行运行的。但是两个阶段(+1 和 * 2)仍然并行还是顺序运行?换句话说,第二阶段(* 2)是否需要等待第一阶段(+ 1)完成?
    • 另外,如您所知,很难使用代码来显示差异。我不能使用thread.sleep,这可能不准确。你能给我一些关于区别的见解吗?
    • >另请注意,mapAsync 根据定义也引入了异步边界|这不是真的。 MapAsync 不这样做
    • 我很久以前的错误!感谢您指出@maks
    猜你喜欢
    • 1970-01-01
    • 2016-06-21
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-01-30
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多