【发布时间】:2019-08-12 04:22:33
【问题描述】:
我正在尝试实现这样的目标:
我正在尝试使用 Flow.fromGraph 创建此流程
- 我可以使用
Zip[B, C]来处理join,它接收2 个流 - 我可以通过两种方式做到
split:- 使用
Broadcast[A](2) - 使用
UnZip[(A,A)],前面是.map(a -> (a, a))
- 使用
map(f1) 和 map(f2) 都是我从包含的库中获取的自定义流程,所以我无法真正修改它们,所以请不要说 .map(a => (f1(a), f2(a)))
这两种情况有什么区别,还是等价的?我发现唯一不同的是 Broadcast 仅在 所有 下游取消 (eagerCancel = false) 时取消的能力是它的默认行为,与 UnZip 不同(它的作用与广播对 eagerCancel = true 的作用相同)
另外,如果两条路径中的任何一条出现故障,会发生什么情况?即,对于特定元素,如果 f1 抛出错误会有什么影响?
另外,假设我们没有f2 函数(所以没有映射操作)并且我们想在最后发出(b,a),是否应该将 f2 替换为标识流,或者是否可以全部跳过一起? (如果是后者,你会曾经使用身份流吗?)
val split = builder.add(BroadCast[A](2))
val join = builder.add(Zip[B, A])
val F1: Flow[A, B, NotUsed] = Flow[A].map(f1)
val I = Flow[A].map(identity)
split ~> F1 ~> join.in1
split ~> /* I ~> */ join.in0 // do i need the commented part?
这可能有助于内部缓冲区/背压?
【问题讨论】:
-
也许 FlowWithContext 在这里有所帮助,但此时它仍处于开发阶段,还没有准备好处理这个问题......而且 FlowWithContext 的文档/示例并不多
标签: scala stream akka akka-stream