【问题标题】:Observable defer in Akka StreamsAkka Streams 中的可观察延迟
【发布时间】:2017-06-11 12:53:30
【问题描述】:

我来自 ReactiveX,我们有运算符 defer,以便创建一个 Observable 并在我们有订阅者后获取发射值。

在 Akka Streams 中,我想知道是否存在类似的东西:

 @Test def defer(): Unit = {
    var range = 0 to 10
    val graphs = Source(range)
      .to(Sink.foreach(println))
    range = 10 to 20
    graphs.run()
    Thread.sleep(2000)
  }

有了这段代码,即使在我们执行 run() 之前,改变范围的值,由于蓝图已经创建,值不会改变,并且发出 0 到 10。

在 Akka Streams 中有类似 Observable.defer 的东西吗?

解决方案

我找到了解决方案,解决方案是使用 lazy 关键字,我们提供了一个函数,一旦我们运行流就会执行。

我会保留这个问题,以防万一有更好的方法或其他人有同样的问题

  @Test def defer(): Unit = {
    var range = 0 to 10
    val graphs = Source.lazily(() => Source(range))
      .to(Sink.foreach(println))
    range = 10 to 20
    graphs.run()
    Thread.sleep(2000)
  }

问候。

【问题讨论】:

    标签: akka rx-java observable akka-stream reactivex


    【解决方案1】:

    最简单的方法可能是Source.fromIterator(() => List(1).iterator) 或类似的东西。在 Akka Streams API 中,我们选择尝试保留最少的运算符集,因此有时您可能会遇到这样的情况,即在单线中可以实现相同的操作,但不会有一个名称与 defer 的情况类似的直接对应物。 .如果您认为这很常见,请在 github.com/akka/akka 上告诉我们,我们可以考虑将其添加为 API。

    请注意,还有 fromFuture 和其他的,虽然没有直接关系,但根据您的实际用例可能有用(尤其是与 Promise 等结合使用时)。

    【讨论】:

    • 感谢您的回答,老实说,我宁愿自己的解决方案也不愿使用迭代器。我也相信 defer 的使用,至少在 ReactiveX 中,在许多用例中是非常普遍和强大的。也许如果我有时间,我会提出合并请求。谢谢康拉德。顺便说一句,哥本哈根的演讲很棒!
    • 如果您没有时间进行拉取请求,只需一张票就可以了,请打开一个 :-) 谢谢,很高兴您喜欢它 :)
    猜你喜欢
    • 2017-01-01
    • 1970-01-01
    • 2017-07-13
    • 1970-01-01
    • 1970-01-01
    • 2021-06-26
    • 2020-07-10
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多