【问题标题】:Terminate Observable by timeout超时终止 Observable
【发布时间】:2015-11-13 00:18:34
【问题描述】:

我正在尝试通过超时限制可观察对象的寿命:

def doLongOperation() = {
   Thread.sleep(duration)
   "OK"
}

def firstStep = Observable.create(
  (observer: Observer[String]) => {
    observer.onNext(doLongOperation())
    observer.onCompleted()
    Subscription()
  }
)

firstStep
  .timeout(1 second)
  .subscribe(
    item => println(item),
    throwable => throw throwable,
    () => println("complete")
  ) 

我想区分以下结果:

  1. Observable 超时结束,未获得结果
  2. 执行期间抛出异常
  3. 执行成功,返回值

我可以处理案例 2 和案例 3,在部分 onNext 和 onError 中没有问题,但是如何检测 observable 是否因超时完成?

还有一件事:尽管我的代码中有对 obeserver.onCompleted() 的调用,但我从来没有遇到过阻塞 onComplete。为什么?

【问题讨论】:

  • Operator timeout 将在 onError 通道上发出 TimeoutException。
  • @akarnokd,那我的代码有什么问题?因为我没有得到任何异常

标签: scala rx-java rx-scala


【解决方案1】:

如果发生超时,TimeoutException 会在计算线程上发出,throw throwable 最终会被忽略,您的主线程不会也看不到它。您可以在超时后添加toBlocking,这样任何异常都会在同一个线程上结束:

firstStep
  .timeout(1 second)
  .toBlocking()
  .subscribe(
    item => println(item),
    throwable => println(throwable),
    () => println("complete")

)

【讨论】:

  • 你是绝对正确的,抛出异常。问题出在另一个地方:我使用了错误的依赖项:“com.netflix.rxjava”而不是“io.reactivex”。
【解决方案2】:

TimeoutException 确实被抛出。该问题是由使用错误的库引起的。我的依赖项中有“com.netflix.rxjava”,而不是“io.reactivex”

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2020-03-22
    • 1970-01-01
    • 1970-01-01
    • 2017-11-14
    • 2011-09-26
    • 2020-09-03
    • 1970-01-01
    相关资源
    最近更新 更多