【问题标题】:monix: Task.executeWithFork prevents execution?monix:Task.executeWithFork 阻止执行?
【发布时间】:2019-01-18 08:43:57
【问题描述】:

我不明白为什么添加 executeWithFork 会阻止任务在以下示例中运行:

import java.util.concurrent.TimeUnit

import monix.execution.schedulers.SchedulerService
import monix.reactive.subjects.ConcurrentSubject

object Sandbox {

  def main(args: Array[String]): Unit = {
    implicit val scheduler: SchedulerService =
      monix.execution.Scheduler(java.util.concurrent.Executors.newCachedThreadPool())

    val input = ConcurrentSubject.publish[String]

    // prints nothing
    input.foreachL(println).executeWithFork.runAsync
    // this works:
    // input.foreachL(println).runAsync

    input.onNext("one")
    input.onNext("two")

    scheduler.shutdown()
    scheduler.awaitTermination(1, TimeUnit.MINUTES, monix.execution.Scheduler.Implicits.global)
  }
}

【问题讨论】:

    标签: scala monix


    【解决方案1】:

    您看到的行为是两个事实的结果:

    1. 使用executeWithFork 会为线程切换带来一点额外延迟

    2. 你使用ConcurrentSubject.publish(而不是replay,例如)。如果您打开 PublishSubject 的文档,您可能会看到

    PublishSubject 仅向订阅者发出订阅时间之后由源发出的那些项目。

    换句话说,发布"one""two" 的主线程与必须订阅input 以获取数据的分叉线程之间存在竞争条件。结果取决于哪个线程赢得比赛:订阅前发布的所有数据都会丢失。我的一个硬件我几乎总是看到"two",偶尔甚至看到"one",你的结果可能会有所不同。

    最简单的测试方法是在第一个input.onNext 之前添加Thread.sleep(100),您应该会看到每次都打印这两个事件。你也可以尝试推送更多的事件,而不是仅仅推送 2 个事件,以确保不会丢失所有内容。

    【讨论】:

      猜你喜欢
      • 2016-06-14
      • 1970-01-01
      • 1970-01-01
      • 2017-10-15
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多