【问题标题】:Create Observable that emits asynchronously创建异步发射的 Observable
【发布时间】:2017-10-22 20:42:29
【问题描述】:

我想创建一个检测事物Observable。这些是及时发现的,并且在订阅者订阅时不可用。我该如何处理?

我的尝试是这样创建一个 Observable:

Observable.create(new AsyncOnSubscribe<Object, Thing>() {
    @Override
    protected Object generateState() { return null; /* state not presented for the sake of simplicity */ }

    @Override
    protected Object next(Object state, long requested, Observer<Observable<? extends Thing>> observer) {
        if(haveMoreThings()) {
            while (!nextThingAvailable()) {
                SystemClock.sleep(100);
            }

            observer.onNext(Observable.just(getNextThing());
        } else {
            observer.onCompleted();
        }
        return null;
    }
});

这是可行的,但由于有些问题我无法弄清楚:

第一个问题 如何检查观察者是否未取消订阅?使用已弃用的Observable.create() 可以这样做:

Observable.create(subscriber -> {
            if(subscriber.isUnsubscribed()){
                // stop detecting things
            }
        });

第二个问题:什么是等待某些东西可用(在可观察对象内)的正确方法? SystemClock.sleep(x); 是要走的路吗?

设计问题:为此使用Subject 可能更容易,但我没有足够的 rxJava exp 来证明这一观点的合理性。万一这是要走的路,如果提供一个例子,那就太棒了。

【问题讨论】:

  • 首先,Observable 反过来工作。它打电话给你,而不是反过来。当您订阅 Observable 时,它​​将开始发出值。每次订阅它时,都会创建一个新的管道。您能否提供有关“事物”api的更多信息。它有异步 api 还是你必须轮询信息?
  • @HansWurst 很抱歉没有澄清这一点。这些事情是从手动处理的异步源轮询的。创建的 Observable 将返回给请求几件事情的调用者。提供的代码将是提供事物的机制,因此它将返回一个 Observable。
  • 遵循您的设计理念。 Subject订阅thing 检测并发布那些 thingSubjects 还强制执行正确的退订行为。

标签: android rx-java observable rx-android


【解决方案1】:

目前还不清楚您要做什么。您可能不需要创建 observable,而是使用 Subject

PublishSubject<Thing> thingDetector = PublishSubject.create();

// detector loop
while ( true ) {
  Thread.sleep( 100 );
  if ( haveAnItem ) {
    thingDetector.onNext( next() );
  }
  if ( allItemsDone ) {
    thingDetector.onComplete();
  }
}

由于您没有具体说明您与事物的交互方式,因此我无法更具体。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2017-11-20
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-08-14
    • 1970-01-01
    • 2019-12-20
    相关资源
    最近更新 更多