【问题标题】:Akka Stream: What is the difference between Unzip and Broadcast?Akka Stream:解压缩和广播有什么区别?
【发布时间】:2019-08-12 04:22:33
【问题描述】:

我正在尝试实现这样的目标:

我正在尝试使用 Flow.fromGraph 创建此流程

  • 我可以使用Zip[B, C] 来处理join,它接收2 个流
  • 我可以通过两种方式做到split
    • 使用Broadcast[A](2)
    • 使用UnZip[(A,A)],前面是.map(a -> (a, a))

map(f1)map(f2) 都是我从包含的库中获取的自定义流程,所以我无法真正修改它们,所以请不要说 .map(a => (f1(a), f2(a)))

这两种情况有什么区别,还是等价的?我发现唯一不同的是 Broadcast 仅在 所有 下游取消 (eagerCancel = false) 时取消的能力是它的默认行为,与 UnZip 不同(它的作用与广播对 eagerCancel = true 的作用相同)

另外,如果两条路径中的任何一条出现故障,会发生什么情况?即,对于特定元素,如果 f1 抛出错误会有什么影响?

另外,假设我们没有f2 函数(所以没有映射操作)并且我们想在最后发出(b,a),是否应该将 f2 替换为标识流,或者是否可以全部跳过一起? (如果是后者,你会曾经使用身份流吗?)

val split = builder.add(BroadCast[A](2))
val join = builder.add(Zip[B, A])
val F1: Flow[A, B, NotUsed] = Flow[A].map(f1)
val I = Flow[A].map(identity)

split ~> F1 ~> join.in1
split ~> /* I ~> */ join.in0 // do i need the commented part?

这可能有助于内部缓冲区/背压?

【问题讨论】:

标签: scala stream akka akka-stream


【解决方案1】:

它们都是 Fanout 运算符;然而

Unzip 来自文档:

获取两个元素元组的流并将两个元素解压缩到两个不同的下游。

Broadcast

在 n 个输出中的每个输出中发出每个传入元素。

因此我们可以得出结论,Unzip 只是一个带有n = 2 的广播;但是重要的是如果元素是一个元组,Broadcast 只会创建相同元组的n 输出。解压缩将为元素 AB

创建 2 个输出每个

【讨论】:

    猜你喜欢
    • 2012-05-07
    • 1970-01-01
    • 1970-01-01
    • 2012-10-06
    • 1970-01-01
    • 1970-01-01
    • 2014-07-18
    • 2012-05-22
    • 1970-01-01
    相关资源
    最近更新 更多